Hello all,
I am a total newbie to the ELK Stack (Please be kind!) and I am hoping to get some assistance. Essentially what I am wanting to do is when I go into Discovery in Kibana, it takes a "sessionID" and combines it into one as opposed to separate events.
What is happening is when someone goes to our login page. I get 3 log events, one on showing up, one when the login takes place and another when the login is complete. I used a GROK filter to create that will separate out the “sessionID” from the message field from the logs.
What I would like to do is when I go into Kibana and go to Discovery, I would prefer to have those 3 events with the same ID just be pushed into one event.
I did follow the example of the aggregate filter (Example 3) and while it does add the fields for “hits” and “several hits” true/false. It still doesn’t combine it into one.
Or should I be looking to NOT use the aggregate filter and be looking more on the Kibana side of things?
Here is a sample of the log
1/9/2020 8:06:26 AM: 1076/9: ConnectAutid Information: 1001: Event: Sent SSO request to local
IdP, Local IdP application: LoginFormIdentityProvider, Partner SP: emailprovider.com, Target
URL: , Session ID: 12345, IP address: 192.168.1.100
1/9/2020 8:06:34 AM: 1076/7: ConnectAudit Information: 1004: Event: Received integration
token from local IdP, Local IdP application: LoginFormIdentityProvider, Partner SP: , Target URL:
,Local user name: enduser, Attributes: 3, mail: enduser@ouremail.com, Domain: ouremail,
UserName: enduser, Session ID: 12345, IP address: 192.168.1.100
1/9/2020 8:06:34 AM: 1076/7: ConnectAudit Information: 1011: Event: SAML v2.0 assertion to
partner SP, Local IdP application: LoginFormIdentityProvider, Partner SP: emailprovider.com,
SAML subject name: enduser@ouremail, Session ID: 12345, IP address: 192.168.1.100
And here is my current pipeline.conf
input {
beats {
port => "5044"
}
}
filter {
grok {
break_on_match => false
match => {
"message" => [
"SAML subject name: %{EMAILADDRESS:email},",
"mail: %{EMAILADDRESS:email},",
"Session ID: %{DATA:sessionID},",
"IP address: %{IP:client}"
]
}
}
geoip {
source => "client"
}
aggregate {
task_id => "%{sessionID}"
code => "map['hits'] ||= 0; map['hits'] += 1;"
push_map_as_event_on_timeout => true
timeout_task_id_field => "sessionID"
timeout => 3
timeout_tags => ['_sessionidtimeout']
timeout_code => "event.set('several_hits', event.get('hits') > 1)"
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["192.168.1.150:9200"]
}
}