New event not inserted

I have a problem with an event not getting inserted on timeout.

This is my configuration:

filter {
  if ![messageid] {
    drop {}
  }

  aggregate {
    task_id => "%{messageid}"
    code => "
      map['avamis_status'] = event.get('status')
      map['avamis_reason'] = event.get('reason')
      map['avamis_from'] = event.get('from')
      map['avamis_to'] = event.get('to')
      map['avamis_size'] = event.get('size')
      map['dovecot_status'] = event.get('status')
      map['postfix_queueid'] = event.get('queueid')

      event.cancel()
    "
    timeout => 2
    timeout_tags => ['_aggregatetimeout']
    push_map_as_event_on_timeout => true
  }
}

I tried removing event.cancel(), but then only the non-agregated events get inserted.

What am I doing wrong?

Which version of logstash are you using?

logstash 7.9.1

What I'm trying here is to combine mail log entries into single one, by messageid. I just cannot make it work.

The filter configuration looks OK to me. I thought you might be hitting this, but that was fixed in 7.9.1. I do not have any suggestions.

Aggregation on timeout is not working at all.
I've tried the latest Logstash version 7.9.3 with not luck either.

This is the modified example from the official Logstash docs:

filter {
  if ![messageid] {
    drop {}
  }

  aggregate {
    task_id => "%{messageid}"
    code => "map['count'] ||= 0; map['count'] += 1;"
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "messageid"
    timeout => 2
    timeout_tags => ['_aggregatetimeout']
  }
}

I only get the original events, no aggregation in results. :frowning:

Does turning off java_execution make any difference?

If I add this to logstash.yml:

pipeline.java_execution: false

I get this error in Logstash log file:

[2020-10-29T08:19:32,846][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Could not determine ID for filter/grok", :backtrace=>["org/logstash/plugins/factory/PluginFactoryExt.java:175:in plugin'", "org/logstash/plugins/factory/PluginFactoryExt.java:161:in plugin'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:93:in plugin'", "(eval):107:in initialize'", "org/jruby/RubyKernel.java:1048:in eval'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:67:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:112:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:357:in block in converge_state'"]}`

Configuration is split into multiple files, each for separate mail segment.
This is what one of my configuration files (Amavis) looks like:

filter {
  if [type] == "rsyslog" {
    # grok log lines by program name (listed alpabetically)
    if [program] == "amavis" and [message] =~ /(?i)message\-id/ {
      grok {
        patterns_dir   => "/etc/logstash/patterns.d"
        match          => [ "message", "%{AMAVIS}" ]
        tag_on_failure => [ "_amavis_nomatch" ]
        add_tag        => [ "_amavis_success" ]
      }

      # data type conversions
      mutate {
	    convert => {
          "status" => "string"
          "reason" => "string"
          "from" => "string"
          "to" => "string"
          "messageid" => "string"
          "queueid" => "string"
          "size" => "integer"
        }
      }

      # post-process data
      if [to] {
        mutate {
          gsub => ["to", "[<>]", ""]
          split => ["to", ","]
       	}
      }
    }
  }
}

The problem was the condition within the output segment:

input {
  udp {
    host => "127.0.0.1"
    port => 10514
    codec => "json"
    type => "rsyslog"
  }
}

filter {
  if [type] != "rsyslog" and ![messageid] {
    drop {}
  }

  aggregate {
    task_id => "%{messageid}"
    code => "
      map['count'] ||= 0
      map['count'] += 1
      #event.cancel()
    "
    push_map_as_event_on_timeout => true
    timeout => 3
    timeout_tags => ['aggregated']
  }
}

output {
  if [type] == "rsyslog" {
    elasticsearch {
      hosts => [ "omxmed:9200" ]
      index => "maillog-%{+YYYY.MM.dd}"
    }
  }
}

I cannot figure out why, but with the condition within the output segment, the aggregate filter couldn't work. Without it, it finally created the aggregated data.

The type field is defined inside the input segment and god knows why it causes the problem?!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.