How to monitor and constantly forward newly added documents from an elasticsearch index to a Kafka topic with logstash?

Dear all,

Question in the title :wink: To be more precise, I have a message written to a Kafka topic, then to an Elasticsearch index using Logstash then written back from the index to another Kafka topic using again Logstash.

I want to do the difference between the message creation time before to be injected to Kafka and the Logstash timestamp when written out of the index to the Kafka topic so as to measure the delay took by the whole pipeline for the message transmission.

My Logstash instance starts, writes out the existing documents from the Elasticsearch index to the Kafka topic and then stops. It does not work as a daemon permanently monitoring the index for new documents to be written to the Kafka topic. How could I do that ?

Thanks,

Here my logstash plugin conf :

input {
elasticsearch {
hosts => "localhost"
index => "poc-local-input"
query => '{ "query": { "match_all": {} } }'
hosts => "localhost:9200"
user => elastic
password => elastic
}
}

output {
kafka {
topic_id => 'poc-local-output'
client_id => "poc-local-elk-output-0"
}
}

Why not write the documents to Kafka at the same time as you write them to Elasticsearch instead of reading them out separately?

Tanks Christian for your answer,

It is simply to understand how i could put data in Elasticsearch and get out parts of it knowing I want to use Kafka as a communication bus between other components of the architecture.

But the architecture is not really the question here, I just don't understand why the plugin is stopping right after sending out the data from the index to the topic as in the other way (Kafka to Elasticsearch) it is working well.

Is it possible to have Logstash working as a daemon doing the query as soon as a new document is stored in the index ? Or are there some limitations ? Does the xpack job/watcher would allow me to that or is it different things ?

Thanks for your help.

As far as I know there is not yet any way to stream changes from Elasticsearch. The newly introduced sequence IDs provides the foundation for building that kind of functionality, but it is not yet possible.

Ok, thanks to have pointed me on these sequence IDs !

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.