Not getting new columns with aggregation

This is my variation of the Logstash aggregation filter:

filter {
  if ![messageid] {
    drop {}
  } else if [program] == "amavis" and [message] =~ /(?i)message\-id/ {
    aggregate {
      task_id => "%{messageid}"
      code => "
        map['avamis_status'] ||= event.get('status')
        map['avamis_reason'] ||= event.get('reason')
        map['avamis_from'] ||= event.get('from')
        map['avamis_to'] ||= event.get('to')
        map['avamis_size'] ||= event.get('size')
      "
    }
  } else if [program] == "dovecot" and [message] =~ /(?i)msgid=/ {
    aggregate {
      task_id => "%{messageid}"
      code => "
        map['dovecot_status'] ||= event.get('status')
      "
    }
  } else if [program] == "postfix/cleanup" and [message] =~ /(?i)message\-id=/ {
    aggregate {
      task_id => "%{messageid}"
      code => "
       map.each do |key, value|
         event.set(key, value)
       end
     "
    }
  }
}

This is the Elasticsearch data:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 4,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "maillog-2020.10.23",
        "_type" : "_doc",
        "_id" : "JBmdVXUBOxXdOptPTtj9",
        "_score" : 1.0,
        "_source" : {
          "messageid" : "3f1-5f92d880-99-156dfa00@143328389",
          "message" : "60F2B751C3: message-id=<3f1-5f92d880-99-156dfa00@143328389>",
          "type" : "rsyslog",
          "queueid" : "60F2B751C3",
          "hostname" : "omxmail",
          "@version" : "1",
          "@timestamp" : "2020-10-23T13:19:32.441Z",
          "host" : "127.0.0.1",
          "processid" : "61946",
          "severity" : "info",
          "program" : "postfix/cleanup",
          "tags" : [
            "_postfix_cleanup_success"
          ],
          "facility" : "mail"
        }
      },
      {
        "_index" : "maillog-2020.10.23",
        "_type" : "_doc",
        "_id" : "JRmdVXUBOxXdOptPUNjN",
        "_score" : 1.0,
        "_source" : {
          "messageid" : "3f1-5f92d880-99-156dfa00@143328389",
          "message" : "6C942751D1: message-id=<3f1-5f92d880-99-156dfa00@143328389>",
          "type" : "rsyslog",
          "queueid" : "6C942751D1",
          "hostname" : "omxmail",
          "@version" : "1",
          "@timestamp" : "2020-10-23T13:19:33.456Z",
          "host" : "127.0.0.1",
          "processid" : "61946",
          "severity" : "info",
          "program" : "postfix/cleanup",
          "tags" : [
            "_postfix_cleanup_success"
          ],
          "facility" : "mail"
        }
      },
      {
        "_index" : "maillog-2020.10.23",
        "_type" : "_doc",
        "_id" : "JxmdVXUBOxXdOptPUNjO",
        "_score" : 1.0,
        "_source" : {
          "messageid" : "3f1-5f92d880-99-156dfa00@143328389",
          "status" : "stored mail into mailbox 'INBOX'",
          "message" : "lmtp(matijam@omx.net): 8DBPIWXYkl8C8gAASlZSFg: sieve: msgid=<3f1-5f92d880-99-156dfa00@143328389>: stored mail into mailbox 'INBOX'",
          "type" : "rsyslog",
          "hostname" : "omxmail",
          "@version" : "1",
          "@timestamp" : "2020-10-23T13:19:33.580Z",
          "host" : "127.0.0.1",
          "processid" : "107339",
          "severity" : "info",
          "program" : "dovecot",
          "tags" : [
            "_dovecot_success"
          ],
          "facility" : "mail"
        }
      },
      {
        "_index" : "maillog-2020.10.23",
        "_type" : "_doc",
        "_id" : "JhmdVXUBOxXdOptPUNjN",
        "_score" : 1.0,
        "_source" : {
          "messageid" : "3f1-5f92d880-99-156dfa00@143328389",
          "status" : "MYNETS",
          "message" : "(49719-17) Passed CLEAN {RelayedInternal}, AUTH/MYNETS LOCAL [192.168.27.13]:60822 <omx1@omx.net> -> <matijam@omx.net>, Queue-ID: 60F2B751C3, Message-ID: <3f1-5f92d880-99-156dfa00@143328389>, mail_id: lVkkazw9ELSn, Hits: 1.8, size: 911, queued_as: 6C942751D1, 1007 ms",
          "type" : "rsyslog",
          "queueid" : "60F2B751C3",
          "hostname" : "omxmail",
          "@version" : "1",
          "from" : "omx1@omx.net",
          "@timestamp" : "2020-10-23T13:19:33.485Z",
          "host" : "127.0.0.1",
          "processid" : "49719",
          "reason" : "LOCAL",
          "severity" : "notice",
          "program" : "amavis",
          "to" : [
            "matijam@omx.net"
          ],
          "size" : 911,
          "tags" : [
            "_amavis_success"
          ],
          "facility" : "mail"
        }
      }
    ]
  }
}

As you can see by the data, he problem is that the postfix/cleanup doesn't get aggregated columns appended to its data by messageid.

If I change:

  code => "
   map.each do |key, value|
     event.set(key, value)
   end
 "

to:

code => "event.set('foo', 'bar')"

I get the appended data:

  {
    "_index" : "maillog-2020.10.26",
    "_type" : "_doc",
    "_id" : "oBsOZHUBOxXdOptPyUTX",
    "_score" : 1.0,
    "_source" : {
      "messageid" : "3f2-5f968b00-fd-4376ba80@110170055",
      "message" : "D385499DA2: message-id=<3f2-5f968b00-fd-4376ba80@110170055>",
      "type" : "rsyslog",
      "queueid" : "D385499DA2",
      "hostname" : "omxmail",
      "@version" : "1",
      "foo" : "bar",
      "@timestamp" : "2020-10-26T08:38:11.876Z",
      "host" : "127.0.0.1",
      "processid" : "119330",
      "severity" : "info",
      "program" : "postfix/cleanup",
      "tags" : [
        "_postfix_cleanup_success"
      ],
      "facility" : "mail"
    }
  }

It seems that at the time of aggregation the map was empty, so the loop doesn't produce any data.

Could it be the order of incoming events was messing things up?

Btw, I set the pipeline.workers: 1, as required by the documentation, to force the execution in a single thread.

Any help would be precious.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.