How to connect to an API using Logstash?

I have an API library written in Python, and I also have data that comes from AWS Athena through queries.

Is there any way to merge these two data sources together using only Logstash? If not what are other suggestions?

My approach so far has been writing a Python script that merges the data coming from AWS Athena and the API, and then I'm planning to call this script in my Logstash configuration. What do you think about this solution?

Still looking for suggestions.

Doing this in Logstash alone, will be difficult. If you are simply enriching the data from AWS Athena with the API data, why not use the http filter in Logstash to make an API call for every Athena event?

https://www.elastic.co/guide/en/logstash/current/plugins-filters-http.html


Another option would be to use Celery for the Python job. Much easier to manage and scale in the long run, if you decide to stick with the Python job that is.

http://docs.celeryproject.org/

1 Like

Nachiket, I appreciate your suggestion.

I'm wondering if I am going to be able to merge the data from the API and AWS Athena using Logstash? I assume I'm just limited to the ruby filter plugin. Also, if it was your preference, would you rather do this task in Python or Logstash?

For my API, I use the source code to access the data from my API. I'm not sending HTTP requests, although it can be done, so would it be useful to use this approach?

Yes, quite possible. But you will have to use either of the following filter plugins to do this:

  • Translate
  • Memcache
  • HTTP
  • Ruby

Each has its own advantages and disadvantages.

  1. Translate: Works quite well on smaller and static data set. The data set needs to updated externally.
  2. Memcache: Works well on large data sets and can scale. Gives the best performance, and is often used to do data enrichment. But requires a separate infra and also needs to be updated externally using a scheduled job so that logstash can query it.
  3. HTTP: Works well for low volume pipelines. Throttling would be needed for larger volumes to ensure you don't kill the API server. Better if the API server already exists somewhere. You can host your own API to give a response based on the request. Highly customizable but difficult to maintain the code and infra for hosting your own API.
  4. Ruby: Able to write custom ruby scripts to transform data.

Depends on how difficult or easy the job is. I personally use both for my data ingestion. I have celery jobs to query and transform data using pandas before I index to Elasticsearch. I find that in some scenarios the joins and concats in dataframes and then using the

helpers.bulk(es, pandas.to_dict('records'))

solves a whole lot of problems for me. :smiley:

While in some scenarios, Logstash works out well. So really, it depends on what the requirement, volume, and load is.

1 Like

I appreciate your detailed response. It is useful to know the possible solutions.

Same, so you are using Python Pandas data frames to do the join which is what I'm doing in my script. For AWS Athena data and the API which are called every 24h, I think it is best to just go with Python and then run the script in Logstash. The only thing, I'm trying to find out here is where should I call my Python script, inside input, filter, or output plugin? (Note: my script directly sends the data to Elasticsearch for indexing).

I understand the issue, and in fact I am debating between different solutions. For example, there is this another situation that I got stuck in, and it is more complicated than the first one. I get a lot of data from AWS Cloudwatch which generates log information about how my lambdas in my pipeline are working, and there is so much data to merge with my API. Also, in terms of cost and overhead, calling my API often is bad, I'm thinking about doing the API call once a day since the data such as customer information doesn't get updated that often, then save the information in a CSV inside my docker which is deployed in an ECS task. Then have Logstsash merge the data from the CSV with all the incoming Cloudwatch logs based on common IDs, and then send the final result to Elasticsearch. What do you think about this solution? Any advice?

Logstash solution to this post: :white_check_mark:

http_poller {
  urls => {      
     <enter_custom_name>  => {
      method => get
      url => "https://***"
      headers => {
        "Content-Type" => "application/json"
        <add_authorization_or_password>
      }
    }
  }
  request_timeout => 60
  codec => "json"
  type => "Http_poller"
  schedule => { "every" => "10s" }
}

The authorization part depends on if your endpoint requires one or not.

You cannot call python script from Logstash as of now. It is not yet supported.

This seems more appropriate, as the data is not really real-time. And if you are using it just for stats or dashboarding requirements, doing it once makes more sense.

I tested it with both Logstash 7.0.1 and 7.3.0 (current) versions, and they both ran my Python script just fine.

I will post the solution in another post, but you can test it like this in Logstash Ruby filter plugin:

ruby{ code => "result = system 'python3 /usr/share/logstash/script/customer_model/run.py'"}

or use Logstash exec input plugin:

  exec {
    command => "python3 /usr/share/logstash/script/customer_model/run.py"
    schedule => "${SCHEDULE}"
    type => "Python"
  }

To run it, I had to create a docker, pull Ubuntu base image, then install Python and all the modules and their dependencies, and also pull Logstash 7.3.0 zip file, unzip in the right directory, send all the necessary configuration to Logstash, etc.

But thanks for all the advice my friend, it was really helpful! :slight_smile:

1 Like