Logstash container shut down + error creating action from filter

Hello I'm new to Elasticsearch

I'm working with log files comming from filebeat and logstash and I'm trying to add a field "response_time", and then affect the difference between timestamps to It. So I create a logstash's filter and I add it to logstash configuration file but when I restared the container I get the error bellow.

This is my logstash configuration file:

input {
  beats {
    port => 5044
  }
}

filter {
  json {
    source => "message"
  }
  ruby {
    code => "event.set('indexDay', event.get('[@timestamp]').time.localtime('+01:00').strftime('%Y%m%d'))"
  }
  aggregate {
        add_field => {
          "response_time" => "timestamp2-timestamp1"
          }
        }
   grok {
     match => ["message","%{LOGLEVEL:loglevel},%{DATESTAMP_RFC2822:timestamp},%{NOTSPACE:event_type},%{NUMBER:capture_res_id},%{NUMBER:capture_pid},%{NUMBER:mti},%{NUMBER:node_id}
    ,%{UUID:msg_uuid},%{NOTSPACE:module},%{NUMBER :respCode}"]}
    if [event_type] == "request_inc" {
     aggregate {
       msg_uuid => "%{UUID}"
       timestamp1 => event.get('DATESTAMP_RFC2822')
       code => "map['response_time'] = 0"
       map_action => "create"
     }
   }
   if [event_type] == "response_outg" {
     aggregate {
       msg_uuid => "%{UUID}"
       event_type => event.set('event_type')
       timestamp2 => "%{DATESTAMP_RFC2822}"
       code => "map['response_time']"
       map_action => "update"
       end_of_task => true
       timeout =>120
     }
   }
}


output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    template => "/usr/share/logstash/templates/testblogstash.template.json"
    template_name => "testblogstash"
    template_overwrite => true
    index => "testblogstash-%{indexDay}"
    codec => json
  }
  stdout {
    codec => rubydebug
  }
}

This is the error from docker logs :

[2022-06-01T14:43:24,529][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [A-Za-z0-9_-], [ \t\r\n], "#", "{", [A-Za-z0-9_], "}" at line 25, column 24 (byte 689) after filter {\r\n json {\r\n source => "message"\r\n }\r\n ruby {\r\n code => "event.set('indexDay', event.get('[@timestamp]').time.localtime('+01:00').strftime('%Y%m%d'))"\r\n }\r\n aggregate {\r\n add_field => {\r\n "response_time" => "timestamp2-timestamp1"\r\n\t\t }\r\n\t\t}\r\n grok {\r\n match => ["message","%{LOGLEVEL:loglevel},%{DATESTAMP_RFC2822:timestamp},%{NOTSPACE:event_type},%{NUMBER:capture_res_id},%{NUMBER:capture_pid},%{NUMBER:mti},%{NUMBER:node_id}\r\n\t,%{UUID:msg_uuid},%{NOTSPACE:module},%{NUMBER :respCode}"]}\r\n if [event_type] == "request_inc" {\r\n aggregate {\r\n\t msg_uuid => "%{UUID}"\r\n\t timestamp1 => event", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:187:in initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:383:in block in converge_state'"]}

[2022-06-01T14:43:24,677][INFO ][org.reflections.Reflections] Reflections took 85 ms to scan 1 urls, producing 119 keys and 417 values

[2022-06-01T14:43:25,300][WARN ][deprecation.logstash.codecs.plain] Relying on default value of pipeline.ecs_compatibility , which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.

[2022-06-01T14:43:25,343][WARN ][deprecation.logstash.codecs.plain] Relying on default value of pipeline.ecs_compatibility , which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.

[2022-06-01T14:43:25,381][WARN ][deprecation.logstash.outputs.elasticsearchmonitoring] Relying on default value of pipeline.ecs_compatibility , which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.

[2022-06-01T14:43:25,465][INFO ][logstash.outputs.elasticsearchmonitoring][.monitoring-logstash] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearchMonitoring", :hosts=>["http://elasticsearch:9200"]}

[2022-06-01T14:43:25,493][INFO ][logstash.outputs.elasticsearchmonitoring][.monitoring-logstash] Elasticsearch pool URLs updated {:changes=>{:removed=>, :added=>[http://elasticsearch:9200/]}}

[2022-06-01T14:43:25,538][WARN ][logstash.outputs.elasticsearchmonitoring][.monitoring-logstash] Restored connection to ES instance {:url=>"http://elasticsearch:9200/"}

[2022-06-01T14:43:25,554][INFO ][logstash.outputs.elasticsearchmonitoring][.monitoring-logstash] Elasticsearch version determined (7.16.3) {:es_version=>7}

[2022-06-01T14:43:25,554][WARN ][logstash.outputs.elasticsearchmonitoring][.monitoring-logstash] Detected a 6.x and above cluster: the type event field won't be used to determine the document _type {:es_version=>7}

[2022-06-01T14:43:25,653][WARN ][logstash.outputs.elasticsearchmonitoring][.monitoring-logstash] Configuration is data stream compliant but due backwards compatibility Logstash 7.x will not assume writing to a data-stream, default behavior will change on Logstash 8.0 (set data_stream => true/false to disable this warning)

[2022-06-01T14:43:25,654][WARN ][logstash.outputs.elasticsearchmonitoring][.monitoring-logstash] Configuration is data stream compliant but due backwards compatibility Logstash 7.x will not assume writing to a data-stream, default behavior will change on Logstash 8.0 (set data_stream => true/false to disable this warning)

[2022-06-01T14:43:25,669][WARN ][logstash.javapipeline ][.monitoring-logstash] 'pipeline.ordered' is enabled and is likely less efficient, consider disabling if preserving event order is not necessary

[2022-06-01T14:43:25,773][INFO ][logstash.javapipeline ][.monitoring-logstash] Starting pipeline {:pipeline_id=>".monitoring-logstash", "pipeline.workers"=>1, "pipeline.batch.size"=>2, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>2, "pipeline.sources"=>["monitoring pipeline"], :thread=>"#<Thread:0x584ece6 run>"}

[2022-06-01T14:43:27,269][INFO ][logstash.javapipeline ][.monitoring-logstash] Pipeline Java execution initialization time {"seconds"=>1.49}

[2022-06-01T14:43:27,306][INFO ][logstash.javapipeline ][.monitoring-logstash] Pipeline started {"pipeline.id"=>".monitoring-logstash"}

[2022-06-01T14:43:28,637][INFO ][logstash.javapipeline ][.monitoring-logstash] Pipeline terminated {"pipeline.id"=>".monitoring-logstash"}

[2022-06-01T14:43:29,460][INFO ][logstash.runner ] Logstash shut down.

And this is an exemple of my log file:

{"log_level":"INFO","timestamp":"2021-12-15T16:06:24.400087Z","event_type":"s_tart","ca_id":"11","c_pid":"114","mti":"00","node_id":"00","msg_uuid":"1234","module":"cmde"}
{"log_level":"INFO","timestamp":"2021-12-15T16:06:31.993057Z","event_type":"e_nd","mti":"00","node_id":"00","msg_uuid":"1234","module":"PWC-cmde","respCode":"1"}

The parser is objecting to the . in event.get. It does not expect punctuation at that point. Never mind that those are not valid options for an aggregate filter (that would cause it to get an error further on in the configuration process).

Study example 1 in the aggregate filter documentation. You are right that you need two aggregate filters, once to store the start time and another to calculate the delta and end the task. However, all the actions to store, retrieve, and process fields from the event using event.get/set need to be inside the code => '...' option.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.