Environment variable in Logstash

I have the following use case:
I am capturing some data from ActiveMQ bus and Storing it in ElasticSearch via Logstash.
I am capturing some data ("version number") via Kafka and storing it in ElasticSearch. This variable is only sent once and then if it changes, it gets sent again. I am using Logstash for this for ingestion.

I would now like to update my data coming in via ActiveMQ, to contain the version number from Kafka, and have the data stored in same index. I need the version number to be a global variable and add it as a separate field to the data coming via ActiveMQ.

Whats the best way to configure this?

I suggest that you look into a persisted value.

I presume you have two pipelines 1) ActiveMQ via jms input and 2) Kafka via kafka input.

Possible Recipe

  1. Add a constant field and value to the both pipeline inputs, say "kafka-vn" => "kvn".
  2. Create a csv file, say /tmp/logstash-kafka/kafka-version.csv with the following contents kvn, 0.
  3. In the kafka pipeline add a translate filter that reads the csv file from above. Use a low refresh interval, 1 second perhaps. destination is "old-kvn"; field is "kafka-vn". You should now have a field called old-kvn that hold the value read from the file.
  4. In the kafka pipeline, add a conditional section in the output section that test whether the old-kvn value is different from the value of the field that hold the latest kafka version number.
  5. In the above conditional section, use the exec output to echo the latest version number to the file above in the same format kvn, <latest version>. Read the exec docs.
  6. You should be able to see this file change if you run the kafka pipeline on its own.
  7. In the activemq pipeline add the same translate filter but this time set the destination to current-kvn
    8 Use this new field current-kvn to set the index in the ES output.

If the 1 second latency is too high then, after you get this working, you could try using the jdbc_streaming filter (with caching turned off) to do the lookup from a local file based sqlite db. Updating the db will need exec output still. See https://stackoverflow.com/q/29044340/5349531

This solution will work across restarts.

Thank you for your suggestion, I will try it out. I have also found there is an environment filter in Logstash, could I use this as well to store variable?

I am planning to use only 1 pipeline, if possible. I want to have two inputs within the single configuration file, one for Kafka (where I get version number) and one for the data coming in via ActiveMQ. If I need to use 1 pipeline only, does your solution work well here too?

To simplify it, is it possible to use aggregate filter plugin to merge the two outputs from Kafka and ActiveMQ? Reading about it, it seems like I need a common field between the two to merge the events- is this the case?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.