Logstash aggregate filter - sendmail logs

I would like to forward sendmail logs based on reception domain. Only possible solution I can see is to aggregate sendmail logs based on session id and forward it.

sample log is

2017-10-06T00:00:25+00:00 smtpser01 sendmail[16415]: v0100Fxz010731: from=<sender@gmail.com>, size=6477, class=0, nrcpts=1, msgid=<57231584.470912.1506815363016.JavaMail.jbuser@mashas08>, proto=SMTP, daemon=MTA, tls_verify=NONE, auth=NONE, relay=smtpser01.example.org [127.0.0.1]
2017-10-06T00:00:26+00:00 smtpser01 sendmail[16421]: v0100Fxz010731: to=<test123@example.org>, delay=00:00:01, xdelay=00:00:01, mailer=esmtp, tls_verify=OK, pri=126477, relay=[192.0.2.1] [192.0.2.1], dsn=2.0.0, stat=Sent (123stn9432-1 Message accepted for delivery)

normally it should be very easy task. but my aggregate filter does not work. :frowning:

input {
  tcp {
    port => 9200
  }
}

filter {
   grok {
     match => { "message" => ["%{TIMESTAMP_ISO8601:timestamp} %{SYSLOGHOST:logsource} %{SYSLOGPROG}: %{SENDMAIL}"] }
   }

   if [from] {
    aggregate {
        task_id => "%{qid}"
        code => "
        map['d'] = {}
        map['d']['mailfrom'] = event.get('from')
        "
        map_action => "create"
    }
   mutate {
     add_field => { "fromtest" => "Hello world, from %{from}" }
   }
   }

   if [to] {
    aggregate {
        task_id => "%{qid}"
        code => "
        map['d']['mailto'] = event.get('to')
        "
        map_action => "update"
        push_map_as_event_on_timeout => true
        end_of_task => true
        timeout => 10
    }
   mutate {
     add_field => { "totest" => "Hello world, to %{to}" }
   }
   }
}
output {
    stdout { codec => rubydebug }
}

rubydebug output. (mutate is used only to test if "if" works.

  "fromtest" => "Hello world, from sender@gmail.com",
     "relay" => "smtpser01.example.org",
       "pid" => "16415",
   "program" => "sendmail",
   "message" => "2017-10-06T00:00:25+00:00 smtpser01 sendmail[16415]: v0100Fxz010731: from=<sender@gmail.com>, size=6477, class=0, nrcpts=1, msgid=<57231584.470912.1506815363016.JavaMail.jbuser@mashas08>, proto=SMTP, daemon=MTA, tls_verify=NONE, auth=NONE, relay=smtpser01.example.org [127.0.0.1]\r",
 "logsource" => "smtpser01",
       "qid" => "v0100Fxz010731",
"@timestamp" => 2017-10-31T15:54:44.256Z,
      "port" => 52981,
  "@version" => "1",
      "host" => "10.0.2.2",
      "from" => "sender@gmail.com",
 "timestamp" => "2017-10-06T00:00:25+00:00"
}
{
    "totest" => "Hello world, to test123@example.org",
       "pid" => "16421",
   "program" => "sendmail",
   "message" => "2017-10-06T00:00:26+00:00 smtpser01 sendmail[16421]: v0100Fxz010731: to=<test123@example.org>, delay=00:00:01, xdelay=00:00:01, mailer=esmtp, tls_verify=OK, pri=126477, relay=[192.0.2.1] [192.0.2.1], dsn=2.0.0, stat=Sent (123stn9432-1 Message accepted for delivery)\r",
 "logsource" => "smtpser01",
       "qid" => "v0100Fxz010731",
    "result" => "Sent (123stn9432-1 Message accepted for delivery)\r",
"@timestamp" => 2017-10-31T15:54:44.256Z,
      "port" => 52981,
  "@version" => "1",
      "host" => "10.0.2.2",
        "to" => "test123@example.org",
       "dsn" => "2.0.0",
 "timestamp" => "2017-10-06T00:00:26+00:00"
}

I found an answer to my question

I had to change LOGIN in "sendmail.grok" to [.a-zA-Z0-9_+-=]+

hth

 input {
  tcp {
    port => 9200
  }
}

filter {
   grok {
     match => { "message" => ["%{TIMESTAMP_ISO8601:timestamp} %{SYSLOGHOST:logsource} %{SYSLOGPROG}: %{SENDMAIL:mailstr}"] }
   }

   if [from] {
    aggregate {
        task_id => "%{qid}"
        code => "map['mailfrom'] = {};
                 map['mailfrom']['mailstr'] = event.get('mailstr'); 
                 map['mailfrom']['timestamp'] = event.get('timestamp'); 
                 map['mailfrom']['logsource'] = event.get('logsource'); 
                 map['mailfrom']['program'] = event.get('program'); 
                 map['mailfrom']['pid'] = event.get('pid');
                 event.cancel()
                 "
                 # 
        #map_action => "update"
    }
   }
  
   if [to] {
    aggregate {
        task_id => "%{qid}"
        code => "map['mailto'] = {};
                 map['mailto']['mailstr'] = event.get('mailstr'); 
                 map['mailto']['timestamp'] = event.get('timestamp'); 
                 map['mailto']['logsource'] = event.get('logsource'); 
                 map['mailto']['program'] = event.get('program'); 
                 map['mailto']['pid'] = event.get('pid');
                 event.set('mailfrom', map['mailfrom']);
                 event.set('mailto', map['mailto']);"
        #map_action => "update"
        #push_map_as_event_on_timeout => true
        #push_previous_map_as_event => true
        remove_field => [ "mailstr", "message", "port", "pid", "qid", "logsource", "program", "host", "dsn", "result", "timestamp", "@version"]
        end_of_task => true
        timeout => 10
     }
   if "@dsa.ge" in [to] {
     mutate { add_tag => "dsa" }
   }
   }
 }
output {
  if "dsa" in [tags] {
   if [mailfrom] {
    syslog {
      host => "192.168.10.166"
      port => 514
      protocol => "udp"
      rfc => "rfc3164"
      message => "%{[mailfrom][mailstr]}"
      sourcehost => "%{[mailfrom][logsource]}"
      appname => "%{[mailfrom][program]}"
      procid => "%{[mailfrom][pid]}"
      msgid => ""
      
      }
    }
    syslog {
      host => "192.168.10.166"
      port => 514
      protocol => "udp"
      rfc => "rfc3164"
      message => "%{[mailto][mailstr]}"
      sourcehost => "%{[mailto][logsource]}"
      appname => "%{[mailto][program]}"
      procid => "%{[mailto][pid]}"
      msgid => ""
      }
    }
   # stdout { codec => rubydebug }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.