Logstash: problem for querying elasticsearch

Hello everyone,

Through logstash, I want to query elasticsearch in order to get fields from previous events and do some computation with fields of my current event and add new fields. Here is what I did:

input file:
{"device":"device1","count":5}
{"device":"device2","count":11}
{"device":"device1","count":8}
{"device":"device3","count":100}
{"device":"device3","count":95}
{"device":"device3","count":155}
{"device":"device2","count":15}
{"device":"device1","count":55}

My expected output:
{"device":"device1","count":5,"previousCount=0","delta":0}
{"device":"device2","count":11,"previousCount=0","delta":0}
{"device":"device1","count":8,"previousCount=5","delta":3}
{"device":"device3","count":100,"previousCount=0","delta":0}
{"device":"device3","count":95,"previousCount=100","delta":-5}
{"device":"device3","count":155,"previousCount=95","delta":60}
{"device":"device2","count":15,"previousCount=11","delta":4}
{"device":"device1","count":55,"previousCount=8","delta":47}

Logstash filter part:
filter {
elasticsearch {
hosts => ["localhost:9200/database"]
query => 'device:"%{[device]}"'
sort => "@timestamp:desc"
fields => ['count','previousCount']
}

if [previousCount]{
    ruby {
        code => "event[delta] = event[count] - event[previousCount]"
    }
}

else{//for devices which are not in the database yet. For instance device1 at count:5
mutate {
add_field => { "previousCount" => "0" }
add_field => { "delta" => "0" }
}
}
}

My problem:
For every line of my input file I got the following error : Failed to query elasticsearch for previous event ..
It seems that every line completely treated is not put in elasticsearch before logstash starts to treat the next line.

I don't know if my conclusion is correct and, if yes, why it happens.

So, do you know how I could solve this problem please ?!

Thank you for your attention and your help.

S

an example for the output:

{
"device" => "device2",
"count" => 15,
"@version" => "1",
"@timestamp" => "2016-06-16T11:52:31.594Z",
"path" => "xxx",
"host" => "localhost.localdomain",
"previousCount" => "0",
"delta" => "0"
}

The problem with this approach is that 1) Logstash sends requests to Elasticsearch in bulk and 2) indexed events are not immediately made available for search in Elasticsearch. This means there will be a lag (up to a second or more) between an index passing through Logstash and it being searchable. If your events are arriving close together you might need to store the data within the Logstash process, which MAY require a custom plugin.

Ok I understand, before starting to develop your solution I would like to know if it is possible to force logstash to process line per line and maybe to add a lap (a kind of sleep) to let logstash put the data in elasticsearch.

I don't believe this is possible, and even if it was you would see very, very poor performance.

ok.
And what about elasticsearc in cluster ? will it change something ?

No, I don't see how it would.

Such a shame !

I just wanted to complet the description, the error is:
error => NoMethodError: undefined method `start_with?' for nil:NilClass

Do you know how I can solve this ?

See here:

Seems the documentation on Elasticsearch filter plugin | Logstash Reference [8.11] | Elastic is wrong still. I did what was posted and changed

fields => ["field1", "field2"]

to

fields => [["field1", "field2"]]

and it worked for me as well. Was running into the same error as you with Logstash 2.3.2.

it solved the problem of start_with but i still have my problem regarding elasticsearch that does not store data as fast as i want...

Ah sorry I have no answer to that, only the fix to start_with sorry :sweat_smile:

Does anyone know if it is possible to configure logstash to only treate one event at atime (filter + store in elasticsearch) before treating the next one ?

Well, you could use the http output instead of the elasticsearch one. The "advantage" is that you can specify the refresh=true parameter when indexing the document and almost be sure that when the next event comes in, the document will be available when searched via the elasticsearch filter plugin.

The problem with this approach is that all events need to be processed in sequence, and each event need to be written to Logstash and refreshed before the next one can be processed. This will be horrendously slow and will not scale. I am not even sure if it is possible to do this using Logstash as it by default processes all stages in parallel.

If you want to see how badly this performs, write a simple script with this logic and try it out.

Creating a custom plugin that stores this data in memory for a certain period is going to perform much better.

+1 Christian
How about using the existing aggregate filter for this?

Finally, I think that i will do the following solution:

I have a logstash file that takes my data from my log and stores it in a elasticsearch index (ES-1). Then I use a second logstash file that takes my data from ES-1 and also queries my previous data from the same ES-1. The results will be store in a new index (ES-2).

It seems to me more reliable and easier to implement even if it is a little bit heavy.