I am processing events from elasticsearch with size =200. env_thread_id is just an env var.
input {
elasticsearch {
hosts => "localhost:9200"
index => "flexnet-perf-*"
query => '{"from":0,"size":200,"_source":["message"],"query":{"match":{"thread_id":${env_thread_id}}},"sort":[{ "@timestamp":{"order" : "asc"}}]}'
}
}
I also have some delay in processing each of the event.
sleep {
# Sleep 1 second for every event.
time => "5"
}
Now logstash process all the events very well, but at the end it errors -
[2018-01-02T16:34:26,190][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Plugin: <LogStash::Inputs::Elasticsearch hosts=>["localhost:9200"], index=>"flexnet-perf-*", query=>"{"from":0,"size":200,"_source":["message"],"query":{"match":{"thread_id":9996}},"sort":[{ "@timestamp":{"order" : "asc"}}]}", id=>"06ff79986766bed66479ccfcce654a2ef68eb69c-1", enable_metric=>true, codec=><LogStash::Codecs::JSON id=>"json_44f37e4c-d655-40d6-ba4a-c14f82727b25", enable_metric=>true, charset=>"UTF-8">, size=>1000, scroll=>"1m", docinfo=>false, docinfo_target=>"@metadata", docinfo_fields=>["_index", "_type", "_id"], ssl=>false>
Error: [404] {"error":{"root_cause":[{"type":"search_context_missing_exception","reason":"No search context found for id [82403]"},{"type":"search_context_missing_exception","reason":"No search context found for id [82404]"},{"type":"search_context_missing_exception","reason":"No search context found for id [82401]"},{"type":"search_context_missing_exception","reason":"No search context found for id [82402]"},{"type":"search_context_missing_exception","reason":"No search context found for id [82400]"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":-1,"index":null,"reason":{"type":"search_context_missing_exception","reason":"No search context found for id [82403]"}},{"shard":-1,"index":null,"reason":{"type":"search_context_missing_exception","reason":"No search context found for id [82404]"}},{"shard":-1,"index":null,"reason":{"type":"search_context_missing_exception","reason":"No search context found for id [82401]"}},{"shard":-1,"index":null,"reason":{"type":"search_context_missing_exception","reason":"No search context found for id [82402]"}},{"shard":-1,"index":null,"reason":{"type":"search_context_missing_exception","reason":"No search context found for id [82400]"}}],"caused_by":{"type":"search_context_missing_exception","reason":"No search context found for id [82400]"}},"status":404}
I am not sure but it may be happening because, at the end of the processing logstash is trying to look for some events which may be deleted (due to the long time logstash takes to process). But not sure what are those records, because by that time logstash prpcessed every events it suppose to.
As a temporary solution, first I am looking for how not to restart the plugin if there is an unrecoverable error. This may not be a recommended setting, but for this case I am fine with that.
Below is the entire conf file -
input {
elasticsearch {
hosts => "localhost:9200"
index => "flexnet-perf-*"
query => '{"from":0,"size":200,"_source":["message"],"query":{"match":{"thread_id":${env_thread_id}}},"sort":[{ "@timestamp":{"order" : "asc"}}]}'
}
}
filter {
grok {
patterns_dir => [ "C:\Projects\PMI\installers\ELK\logstash-5.5.0\bin\patterns" ]
match => {
"message" => "%{USERNAME:username}%{SPACE:space}%{IP:ip}%{SPACE:space}%{INT:thread_id}%{SPACE:space}%{INT:step_id}%{SPACE:space}%{MY_DATETIME:LOG_DATETIME}%{SPACE:space}%{NOTSPACE:exec_time}%{SPACE:space}%{CISCO_REASON:some_words}%{SPACE:space}%{JAVACLASS:javaclass}%{SPACE:space}%{GREEDYDATA:audit_message_orig}"
}
}
sleep {
# Sleep 1 second for every event.
time => "5"
}
mutate{
convert => {"thread_id" => "integer"}
}
mutate{
convert => {"step_id" => "integer"}
}
if [step_id] and [step_id] > 0 {
ruby {
code => 'event.set("parent_step_id",event.get("step_id") -1)'
}
elasticsearch {
hosts => "localhost:9200"
user => "elastic"
password => "changeme"
index => "flexnet-ptree-*"
query => "thread_id:%{[thread_id]} AND step_id:%{[parent_step_id]}"
sort => "@timestamp:desc"
fields => { "@timestamp" => "parent_timestamp" }
}
}
mutate{
split => {"exec_time" => ","}
}
mutate{
join => {"exec_time" => "."}
}
mutate{
convert => {"exec_time" => "float"}
}
date {
match => [ "LOG_DATETIME", "yyyy-MM-dd hh:mm:ss,SSS" ]
locale => "en_US"
target => "LOG_DATETIME"
}
if "key:" in [audit_message_orig]{
grok {
match => { "audit_message_orig" => "%{WORD:message_type}:%{SPACE}%{WORD:message_value}"}
}
mutate {
add_field => ["audit_message","%{message_type}%{message_value}"]
}
}
if "Operation: Code:" in [audit_message_orig]{
grok {
match => { "audit_message_orig" => "%{WORD:message_type}:%{SPACE:space}%{WORD:some_words}:%{SPACE:space}%{WORD:message_value}"}
}
mutate {
add_field => ["audit_message","%{message_type}%{message_value}"]
}
}
if "Step:" in [audit_message_orig]{
grok {
match => { "audit_message_orig" => "%{WORD:message_type}:%{SPACE:space}%{CISCO_REASON:message_value}"}
}
mutate {
add_field => ["audit_message","%{message_type}%{message_value}"]
}
}
if "Function:" in [audit_message_orig]{
grok {
match => { "audit_message_orig" => "%{WORD:message_type}:%{SPACE:space}%{CISCO_REASON:message_value}"}
}
mutate {
add_field => ["audit_message","%{message_type}%{message_value}"]
}
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => "localhost:9200"
user => "elastic"
password => "changeme"
index => "flexnet-ptree-${env_thread_id}"
}
}