ELK - Aggregating Fields from 2 different messages into a single log

Hi All,

We are a newbie to Logstash and Elastic search and trying to bring more analytics view for Firewalls.

We have the following two messages (Access-Log & Flow-log) coming at different intervals:

2018-06-01T04:43:36+0000 flowIdLog, applianceName=VSM, tenantName=Org1, flowId=33559409, flowCookie=1527828239, sourceIPv4Address=192.168.20.33, destinationIPv4Address=91.189.89.199, sourcePort=123, destinationPort=123, tenantId=1, vsnId=0, applianceId=1, ingressInterfaceName=vni-0/3.0, egressInterfaceName=ptvi-0/251, fromCountry=, toCountry=United Kingdom, protocolIdentifier=17, fromZone=trust, fromUser=unknown, toZone=ptvi, icmpTypeIPv4=0

2018-06-01T04:43:36+0000 accessLog, applianceName=VSM, tenantName=Org1, flowId=33559409, flowCookie=1527828239, flowStartMilliseconds=71919104, flowEndMilliseconds=0, sentOctets=76, sentPackets=1, recvdOctets=0, recvdPackets=0, appId=384, eventType=start, tenantId=1, urlCategory=, action=allow, vsnId=0, applianceId=1, appRisk=3, appProductivity=5, appIdStr=ntp, appFamily=networking, appSubFamily=network-service, rule=elastic, forwardForwardingClass=fc_be, reverseForwardingClass=fc_be, host=\n

Both has Flowid (33559409) common. We need to merge both this logs as a common log/event, so that based on the flowID we will be able to corelate top 5s based on SRC-IP, DST-IP, APP-ID, URLs etc. We have the following logstash.conf filter but not sure, if this will work or not.

filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:time} %{WORD:logtype}, applianceName=%{NOTSPACE:applianceName}, tenantName=%{NOTSPACE:TENANT}, flowId=%{INT:FLOWID}, flowCookie=%{INT:FlowCookie}, flowStartMilliseconds=%{INT:FlowStartMilliseconds}, flowEndMilliseconds=%{INT:flowEndMilliseconds}, sentOctets=%{INT:sentOctets}, sentPackets=%{INT:sentPackets}, recvdOctets=%{INT:recvdOctets}, recvdPackets=%{INT:recvdPackets}, appId=%{INT:appId}, eventType=%{WORD:eventType}, tenantId=%{INT:tenantId}, urlCategory=, action=%{WORD:action}, vsnId=%{INT:vsnId}, applianceId=%{INT:applianceId}, appRisk=%{INT:appRisk}, appProductivity=%{INT:appProductivity}, appIdStr=%{NOTSPACE:appIdStr}, appFamily=%{NOTSPACE:appFamily}, appSubFamily=%{NOTSPACE:appSubFamily}, rule=%{DATA:rule}, forwardForwardingClass=%{DATA:forwardForwardingClass}, reverseForwardingClass=%{DATA:reverseForwardingClass}, host=%{DATA:host}" }
match => { "message" => "%{TIMESTAMP_ISO8601:time} %{WORD:logtype}, applianceName=%{NOTSPACE:applianceName}, tenantName=%{NOTSPACE:TENANT}, flowId=%{INT:FLOWID}, flowCookie=%{INT:FlowCookie}, flowStartMilliseconds=%{INT:FlowStartMilliseconds}, flowEndMilliseconds=%{INT:flowEndMilliseconds}, sentOctets=%{INT:sentOctets}, sentPackets=%{INT:sentPackets}, recvdOctets=%{INT:recvdOctets}, recvdPackets=%{INT:recvdPackets}, appId=%{INT:appId}, eventType=%{WORD:eventType}, tenantId=%{INT:tenantId}, urlCategory=%{NOTSPACE:urlCategory}, action=%{WORD:action}, vsnId=%{INT:vsnId}, applianceId=%{INT:applianceId}, appRisk=%{INT:appRisk}, appProductivity=%{INT:appProductivity}, appIdStr=%{NOTSPACE:appIdStr}, appFamily=%{NOTSPACE:appFamily}, appSubFamily=%{NOTSPACE:appSubFamily}, rule=%{DATA:rule}, forwardForwardingClass=%{DATA:forwardForwardingClass}, reverseForwardingClass=%{DATA:reverseForwardingClass}, host=%{DATA:host}" }

match => { "message" => "%{TIMESTAMP_ISO8601:time} %{WORD:logtype}, applianceName=%{WORD:applianceName}, tenantName=%{DATA:TENANT}, flowId=%{INT:FLOWID}, flowCookie=%{INT:FlowCookie}, sourceIPv4Address=%{IP:sourceIPv4Address}, destinationIPv4Address=%{IP:destinationIPv4Address}, sourcePort=%{INT:sourcePort}, destinationPort=%{INT:destinationPort}, tenantId=%{INT:tenantId}, vsnId=%{INT:vsnId}, applianceId=%{INT:applianceId}, ingressInterfaceName=%{DATA:ingressInterfaceName}, egressInterfaceName=%{DATA:egressInterfaceName}, fromCountry=%{DATA:fromCountry}, toCountry=%{DATA:toCountry}, protocolIdentifier=%{INT:protocolIdentifier}, fromZone=%{DATA:fromZone}, fromUser=%{DATA:fromUser}, toZone=%{DATA:toZone}, icmpTypeIPv4=%{DATA:icmpTypeIPv4}" }

}
if [logtype] == "flowIdlog" {

aggregate {
  task_id => "%{flowId}"
  code => "map['appIdStr'] = event['appIdStr']"
}

}
else if [logtype] == "accesslog" {
aggregate {
task_id => "%{flowId}"
code => "event['appIdStr'] = map['appIdStr']"
end_of_task => true
timeout => 120
}
}
}

We can see the groks passing it correctly, but unable to verify if the aggregate is working or not? Need expert advise to solve our needs.

Can someone guide us on this?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.