Logstash read new data in elasticsearch as input?

Hello folks,

I am using elasticsearch as an input of logstash. My script works well. However, when I insert new data in elasticsearch, after I run logstash, these new data are not read by logstash.
I know that Elasticsearch is not a messaging service but I would like to know if there is a way to solve that.
is the parameter 'scroll' dedicated to this situation ?

Thanks

Regards,

S

does anyone have any idea please ?! :sweat_smile:

What does your Logstash configuration look like? What do the documents you insert look like?

Here is what I want to deploy

Input data in kafka:
json format
{"deviceName":"device1", "counter":125}
{"deviceName":"device1", "counter":132}

 in index2, I want to add:
{"deviceName":"device1", "counter":125, "delta":xx}
{"deviceName":"device1", "counter":132,"delta":7} (7=132-125)

Logstash1:
input {
stdin { }
kafka {
zk_connect => ["ip:port"]
topic_id => "queue_name"
codec = > "json"
group_id => "logstash"
}
}
filter {
my filters
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "index1"
}
stdout { codec => rubydebug }
}

Logstash2:
input {
stdin { }
elasticsearch {
hosts => ["localhost:9200"]
index => "index1"
}
}
filter {
my filters 2
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "index2"
}
stdout { codec => rubydebug }
}

What would be the point of pulling data from index1 instead of computing the delta in the first Logstash instance?

I tried to do it with only one file ..but I got the following problems:

  • First attempt: I did not find a filter that let me save in-memory a line per device to compute the delta at each new line and after saving in-memory this new line, and so on...

  • Second attempt: I used the filter eslaticsearch to query for every new line the previous line corresponding to the same deviceName in index1. The problem with this approach is the data is not immediatly available in elasticsearch when I save it in ES. So, the computation of delta was sometimes wrong

If you think about a solution, I would be so much thankful to get it

I did not find a filter that let me save in-memory a line per device to compute the delta at each new line and after saving in-memory this new line, and so on...

But how would this be different if you were to perform the delta computation in the second Logstash instance? You might be able to use the aggregate filter for this. A custom plugin would also do.

in logstash2 I use an elasticsearch filter to query the previous line in order to do the delta. In this situation, I don't have any problem to get my data available or not like i said in my previous post regarding my second attempt.