Help with combining events from a log

Hello all,

I am a total newbie to the ELK Stack (Please be kind!) and I am hoping to get some assistance. Essentially what I am wanting to do is when I go into Discovery in Kibana, it takes a "sessionID" and combines it into one as opposed to separate events.

What is happening is when someone goes to our login page. I get 3 log events, one on showing up, one when the login takes place and another when the login is complete. I used a GROK filter to create that will separate out the “sessionID” from the message field from the logs.

What I would like to do is when I go into Kibana and go to Discovery, I would prefer to have those 3 events with the same ID just be pushed into one event.

I did follow the example of the aggregate filter (Example 3) and while it does add the fields for “hits” and “several hits” true/false. It still doesn’t combine it into one.

Or should I be looking to NOT use the aggregate filter and be looking more on the Kibana side of things?

Here is a sample of the log

1/9/2020 8:06:26 AM: 1076/9: ConnectAutid Information: 1001: Event: Sent SSO request to local 
IdP, Local IdP application: LoginFormIdentityProvider, Partner SP: emailprovider.com, Target 
URL: , Session ID: 12345, IP address: 192.168.1.100
1/9/2020 8:06:34 AM: 1076/7: ConnectAudit Information: 1004: Event: Received integration 
token from local IdP, Local IdP application: LoginFormIdentityProvider, Partner SP: , Target URL: 
,Local user name: enduser, Attributes: 3, mail: enduser@ouremail.com, Domain: ouremail, 
UserName: enduser, Session ID: 12345, IP address: 192.168.1.100
1/9/2020 8:06:34 AM: 1076/7: ConnectAudit Information: 1011: Event: SAML v2.0 assertion to 
partner SP, Local IdP application: LoginFormIdentityProvider, Partner SP: emailprovider.com, 
SAML subject name: enduser@ouremail, Session ID: 12345, IP address: 192.168.1.100

And here is my current pipeline.conf

input {
beats {
port => "5044"
}
}
filter {
  grok {
break_on_match => false
    match => {
      "message" => [
        "SAML subject name: %{EMAILADDRESS:email},",
        "mail: %{EMAILADDRESS:email},",
        "Session ID: %{DATA:sessionID},",
        "IP address: %{IP:client}"
      ]
  }
}
geoip {
 source => "client"
}
aggregate {
task_id => "%{sessionID}"
code => "map['hits'] ||= 0; map['hits'] += 1;"
push_map_as_event_on_timeout => true
timeout_task_id_field => "sessionID"
timeout => 3
timeout_tags => ['_sessionidtimeout']
timeout_code => "event.set('several_hits', event.get('hits') > 1)"
}

output {
stdout { codec => rubydebug }
elasticsearch {
    hosts => ["192.168.1.150:9200"]
}
}

With that pipeline I would expect, for a given sessionID, to get the three original events, plus one more that just contains [hits] and [several_hits] fields.

When using push_map_as_event_on_timeout you need to add all the fields you want to aggregate to the map. Assuming that 'ConnectAudit Information' is essentially a message type you would parse each of the three message format using grok (you can match a message against an array of patterns to do this), then pull out the fields you want from each format

code => "
     cid = event.get("ConnectAuditInformation")
     case cid
     when "1001"
         map["localApp"] = event.get("localIdpApplication")
         map["partnerSP"] = event.get("partnerSP")
         map["ipAddress"] = event.get("ipAddress")
     when "1004"
         map["mail"] = event.get("mail")
         map["localUser"] = event.get("localUsername")
     when "1011"
         map["samlSubject"] = event.get("samlSubjectName")
     end

You may also want

     event.cancel

to drop the 3 original messages so that you just keep the aggregated map.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.