Polling New/Modified elasticsearch records to another endpoint


I am trying to find a solution to read all elasticsearch records periodically every 1 minute, that have been modified or newly created in an elasticsearch index. These new records further need to be published using logstash into a Kafka or queue endpoint.

I see there is exactly an option in jdbc input plugin where you can poll database using specify
"tracking_column" , "tracking_column_type" , "use_column_value " which gives you changed/new records in result set.

I explored Elasticsearch input plugin but it does not seem to have similar options, is there a way to do this using logstash elasticsearch input plugin or another logstash input plugin. Would appreciate any pointers.


There is no support for persisting a value from the most recent document fetched by an elasticsearch input. There is an open issue where that was requested a couple of years ago but I see no indication that anyone has acted on it.

If I absolutely had to do this in logstash I would use two ruby filters, the first to fetch the value of a tracking value from an instance variable, and the second to store said tracking value. In between the two I would use an http filter to make a call to elasticsearch. Then parse the response and use a split filter to generate multiple documents. You could drive the whole thing with

 input { exec { command => "/bin/true" interval => 60 } }

You would have to set pipeline.workers to 1, so it does not scale, and for now you would need to disable the java execution engine (a fix for that has been implemented, but not released).

It would probably be easier to implement in /bin/sh than logstash :slight_smile:

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.