Logstash aggregate complicated, please help

Hi there
I'm trying to create aggregation filter for my email exim logs.
Can you help me?

Basically I have logs like these:

2017-04-10 00:00:30 1cxKsn-0001GB-2t CTAS=IN RefID= ( ISpam= IFlags=v=2.2 cv=Op4/823t c=1 sm=1 tr=0 a=6HVp5djceeYjte4jJb6Ryw==:17 a=AzvcPWV-tVgA:10 a=uHJYF-HtSykr7tHsIToA:9 a=CTTii-5M3Z-LMe4tr8cA:9 a=QEXdDO2ut3YA:10 a=pyshpDcKeHPZtuIe0Z8A:9 )
2017-04-10 00:00:30 1cxKsn-0001GB-2t <= email@domain.com H=m37s3-2-28db.ispgateway.com [176.221.47.15] P=smtp S=2567 id=201704092200.v39M0Qxr016654@m37s3-2-28db.ispgateway.com
2017-04-10 00:00:30 1cxKsn-0001GB-2t => info@domainx.com R=internal_gw T=remote_smtp H=192.168.1.11 [192.168.1.11] C="250 OK id=1cxKso-0002iK-Q7"
2017-04-10 00:00:30 1cxKsn-0001GB-2t Completed
2017-04-10 00:00:30 fixed_login authenticator failed for (faYNpaLtF) [192.168.24.24]: 535 Incorrect authentication data
2017-04-10 00:00:30 fixed_login authenticator failed for (lkLmh6Lk) [192.168.24.24]: 535 Incorrect authentication data
2017-04-10 00:00:30 fixed_login authenticator failed for (dLKdHZ) [192.168.24.24]: 535 Incorrect authentication data
2017-04-10 00:00:30 H=mx4.rissoidupgrades.com [79.137.110.132] F=<rtcjrc-cmok892@rissoidupgrades.com> rejected RCPT <qfuohabte_p145@verim.de>: ICIR16 - unknown user
2017-04-10 00:00:30 unexpected disconnection while reading SMTP command from ([111.111.111.111]) [117.241.112.188] (error: Connection reset by peer)
2017-04-10 00:00:30 1cxKso-0001GQ-1R CTAS=IN RefID= ( ISpam=Confirmed IFlags=v=2.2 cv=Op4/823t c=1 sm=1 tr=0 a=LMNu0MzFDzFZvX0DaJwgIA==:17 a=AwJkFeBFn10A:10 a=AzvcPWV-tVgA:10 a=HFQ-CQzmNWWYERzML24A:9 )
2017-04-10 00:00:31 1cxKso-0001GQ-1R <= kd123456@abcdrfg.managed.com H=abcdrfg.managed.com [62.138.219.130] P=esmtp S=671 id=20170409220030.5BCED80909@ma60655.psmanaged.com
2017-04-10 00:00:30 fixed_login authenticator failed for (faYNpaLtF) [192.168.24.24]: 535 Incorrect authentication data
2017-04-10 00:00:30 fixed_login authenticator failed for (lkLmh6Lk) [192.168.24.24]: 535 Incorrect authentication data
2017-04-10 00:00:30 fixed_login authenticator failed for (dLKdHZ) [192.168.24.24]: 535 Incorrect authentication data
2017-04-10 00:00:30 H=mx4.rissoidupgrades.com [79.137.110.132] F=<sdfsdg-sdfsd34@downgrades.com> rejected RCPT <sdfsdf_dsf343@varum.com>: ICIR16 - unknown user
2017-04-10 00:00:30 unexpected disconnection while reading SMTP command from ([117.241.112.188]) [117.241.112.188] (error: Connection reset by peer)
2017-04-10 00:00:31 1cxKso-0001GQ-1R => sarah@tele.com R=internal_gw T=remote_smtp H=192.168.1.11 [192.168.1.11] C="250 OK id=1cxKsp-0002iR-QJ"
2017-04-10 00:00:31 1cxKso-0001GQ-1R Completed

In the end I want to have aggregated events in elastic like these :
first event:

2017-04-10 00:00:30 1cxKsn-0001GB-2t CTAS=IN RefID= ( ISpam= IFlags=v=2.2 cv=Op4/823t c=1 sm=1 tr=0 a=6HVp5djceeYjte4jJb6Ryw==:17 a=AzvcPWV-tVgA:10 a=uHJYF-HtSykr7tHsIToA:9 a=CTTii-5M3Z-LMe4tr8cA:9 a=QEXdDO2ut3YA:10 a=pyshpDcKeHPZtuIe0Z8A:9 )
2017-04-10 00:00:30 1cxKsn-0001GB-2t <= email@domain.com H=m37s3-2-28db.ispgateway.com [176.221.47.15] P=smtp S=2567 id=201704092200.v39M0Qxr016654@m37s3-2-28db.ispgateway.com
2017-04-10 00:00:30 1cxKsn-0001GB-2t => info@domainx.com R=internal_gw T=remote_smtp H=192.168.1.11 [192.168.1.11] C="250 OK id=1cxKso-0002iK-Q7"
2017-04-10 00:00:30 1cxKsn-0001GB-2t Completed 

second event:

2017-04-10 00:00:30 1cxKso-0001GQ-1R CTAS=IN RefID= ( ISpam=Confirmed IFlags=v=2.2 cv=Op4/823t c=1 sm=1 tr=0 a=LMNu0MzFDzFZvX0DaJwgIA==:17 a=AwJkFeBFn10A:10 a=AzvcPWV-tVgA:10 a=HFQ-CQzmNWWYERzML24A:9 )
2017-04-10 00:00:31 1cxKso-0001GQ-1R <= kd123456@abcdrfg.managed.com H=abcdrfg.managed.com [62.138.219.130] P=esmtp S=671 id=20170409220030.5BCED80909@ma60655.psmanaged.com
2017-04-10 00:00:31 1cxKso-0001GQ-1R => sarah@tele.com R=internal_gw T=remote_smtp H=192.168.1.11 [192.168.1.11] C="250 OK id=1cxKsp-0002iR-QJ"
2017-04-10 00:00:31 1cxKso-0001GQ-1R Completed

All other lines should be dropped.
My grok patterns look like this
And my filter conf looks like this

Please help me to deal with that case.

@deatheros
I think the following configuration should do what you expect :

filter {
    if [type] == "exim" {
        if [message] =~ /CTAS/ {
            grok {
                patterns_dir   => "/etc/logstash/patterns.d"
                match          => [ "message", "%{EXIM_SPAM}" ]
                tag_on_failure => [ "_grok_exim_nomatch" ]
                add_tag        => [ "_grok_exim_success" ]
            }
            aggregate {
              task_id => "%{exim_msgid}"
              code => 'map["requestContent"] = event.get("message") ; event.cancel()'
              map_action => "create"
            }
        } else if [message] =~ /\<\=/ {
            grok {
                patterns_dir   => "/etc/logstash/patterns.d"
                match          => [ "message", "%{EXIM_LEFT}" ]
                tag_on_failure => [ "_grok_exim_left_nomatch" ]
                add_tag        => [ "_grok_exim_left_success" ]
            }
            aggregate {
              task_id => "%{exim_msgid}"
              code => 'map["requestContent"] += "\n" + event.get("message") ; event.cancel()'
              map_action => "update"
            }
        } else if [message] =~ /\=\>/ {
            grok {
                patterns_dir   => "/etc/logstash/patterns.d"
                match          => [ "message", "%{EXIM_RIGHT}" ]
                tag_on_failure => [ "_grok_exim_right_nomatch" ]
                add_tag        => [ "_grok_exim_right_success" ]
            }
            aggregate {
              task_id => "%{exim_msgid}"
              code => 'map["requestContent"] += "\n" + event.get("message") ; event.cancel()'
              map_action => "update"
            }
        } else if [message] =~ /Completed/ {
            grok {
                patterns_dir   => "/etc/logstash/patterns.d"
                match          => [ "message", "%{EXIM_SPAM_CHECK_ST}" ]
                tag_on_failure => [ "_grok_exim_completed_nomatch" ]
                add_tag        => [ "_grok_exim_completed_success" ]
            }
            aggregate {
              task_id => "%{exim_msgid}"
              code => 'map["requestContent"] += "\n" + event.get("message") ; event.set("message", map["requestContent"])'
              map_action => "update"
              end_of_task => true
            }
        } else {
            grok {
                add_tag        => [ "_grok_spamd_no_match" ]
            }
        }
        if ("_grokparsefailure" in [tags]) {
            drop {}
        }
    }
}
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.