I suggest that you look into a persisted value.
I presume you have two pipelines 1) ActiveMQ via jms input and 2) Kafka via kafka input.
Possible Recipe
- Add a constant field and value to the both pipeline inputs, say "kafka-vn" => "kvn".
- Create a csv file, say
/tmp/logstash-kafka/kafka-version.csvwith the following contentskvn, 0. - In the kafka pipeline add a translate filter that reads the csv file from above. Use a low refresh interval, 1 second perhaps.
destinationis "old-kvn";fieldis "kafka-vn". You should now have a field called old-kvn that hold the value read from the file. - In the kafka pipeline, add a conditional section in the output section that test whether the
old-kvnvalue is different from the value of the field that hold the latest kafka version number. - In the above conditional section, use the exec output to echo the latest version number to the file above in the same format
kvn, <latest version>. Read the exec docs. - You should be able to see this file change if you run the kafka pipeline on its own.
- In the activemq pipeline add the same translate filter but this time set the destination to
current-kvn
8 Use this new fieldcurrent-kvnto set the index in the ES output.
If the 1 second latency is too high then, after you get this working, you could try using the jdbc_streaming filter (with caching turned off) to do the lookup from a local file based sqlite db. Updating the db will need exec output still. See https://stackoverflow.com/q/29044340/5349531
This solution will work across restarts.