ELK - Aggregating Fields from 2 different messages into a single log


(Steelerz) #1

Hi All,

We are a newbie to Logstash and Elastic search and trying to bring more analytics view for Firewalls.

We have the following two messages (Access-Log & Flow-log) coming at different intervals:

2018-06-01T04:43:36+0000 flowIdLog, applianceName=VSM, tenantName=Org1, flowId=33559409, flowCookie=1527828239, sourceIPv4Address=192.168.20.33, destinationIPv4Address=91.189.89.199, sourcePort=123, destinationPort=123, tenantId=1, vsnId=0, applianceId=1, ingressInterfaceName=vni-0/3.0, egressInterfaceName=ptvi-0/251, fromCountry=, toCountry=United Kingdom, protocolIdentifier=17, fromZone=trust, fromUser=unknown, toZone=ptvi, icmpTypeIPv4=0

2018-06-01T04:43:36+0000 accessLog, applianceName=VSM, tenantName=Org1, flowId=33559409, flowCookie=1527828239, flowStartMilliseconds=71919104, flowEndMilliseconds=0, sentOctets=76, sentPackets=1, recvdOctets=0, recvdPackets=0, appId=384, eventType=start, tenantId=1, urlCategory=, action=allow, vsnId=0, applianceId=1, appRisk=3, appProductivity=5, appIdStr=ntp, appFamily=networking, appSubFamily=network-service, rule=elastic, forwardForwardingClass=fc_be, reverseForwardingClass=fc_be, host=\n

Both has Flowid (33559409) common. We need to merge both this logs as a common log/event, so that based on the flowID we will be able to corelate top 5s based on SRC-IP, DST-IP, APP-ID, URLs etc. We have the following logstash.conf filter but not sure, if this will work or not.

filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:time} %{WORD:logtype}, applianceName=%{NOTSPACE:applianceName}, tenantName=%{NOTSPACE:TENANT}, flowId=%{INT:FLOWID}, flowCookie=%{INT:FlowCookie}, flowStartMilliseconds=%{INT:FlowStartMilliseconds}, flowEndMilliseconds=%{INT:flowEndMilliseconds}, sentOctets=%{INT:sentOctets}, sentPackets=%{INT:sentPackets}, recvdOctets=%{INT:recvdOctets}, recvdPackets=%{INT:recvdPackets}, appId=%{INT:appId}, eventType=%{WORD:eventType}, tenantId=%{INT:tenantId}, urlCategory=, action=%{WORD:action}, vsnId=%{INT:vsnId}, applianceId=%{INT:applianceId}, appRisk=%{INT:appRisk}, appProductivity=%{INT:appProductivity}, appIdStr=%{NOTSPACE:appIdStr}, appFamily=%{NOTSPACE:appFamily}, appSubFamily=%{NOTSPACE:appSubFamily}, rule=%{DATA:rule}, forwardForwardingClass=%{DATA:forwardForwardingClass}, reverseForwardingClass=%{DATA:reverseForwardingClass}, host=%{DATA:host}" }
match => { "message" => "%{TIMESTAMP_ISO8601:time} %{WORD:logtype}, applianceName=%{NOTSPACE:applianceName}, tenantName=%{NOTSPACE:TENANT}, flowId=%{INT:FLOWID}, flowCookie=%{INT:FlowCookie}, flowStartMilliseconds=%{INT:FlowStartMilliseconds}, flowEndMilliseconds=%{INT:flowEndMilliseconds}, sentOctets=%{INT:sentOctets}, sentPackets=%{INT:sentPackets}, recvdOctets=%{INT:recvdOctets}, recvdPackets=%{INT:recvdPackets}, appId=%{INT:appId}, eventType=%{WORD:eventType}, tenantId=%{INT:tenantId}, urlCategory=%{NOTSPACE:urlCategory}, action=%{WORD:action}, vsnId=%{INT:vsnId}, applianceId=%{INT:applianceId}, appRisk=%{INT:appRisk}, appProductivity=%{INT:appProductivity}, appIdStr=%{NOTSPACE:appIdStr}, appFamily=%{NOTSPACE:appFamily}, appSubFamily=%{NOTSPACE:appSubFamily}, rule=%{DATA:rule}, forwardForwardingClass=%{DATA:forwardForwardingClass}, reverseForwardingClass=%{DATA:reverseForwardingClass}, host=%{DATA:host}" }

match => { "message" => "%{TIMESTAMP_ISO8601:time} %{WORD:logtype}, applianceName=%{WORD:applianceName}, tenantName=%{DATA:TENANT}, flowId=%{INT:FLOWID}, flowCookie=%{INT:FlowCookie}, sourceIPv4Address=%{IP:sourceIPv4Address}, destinationIPv4Address=%{IP:destinationIPv4Address}, sourcePort=%{INT:sourcePort}, destinationPort=%{INT:destinationPort}, tenantId=%{INT:tenantId}, vsnId=%{INT:vsnId}, applianceId=%{INT:applianceId}, ingressInterfaceName=%{DATA:ingressInterfaceName}, egressInterfaceName=%{DATA:egressInterfaceName}, fromCountry=%{DATA:fromCountry}, toCountry=%{DATA:toCountry}, protocolIdentifier=%{INT:protocolIdentifier}, fromZone=%{DATA:fromZone}, fromUser=%{DATA:fromUser}, toZone=%{DATA:toZone}, icmpTypeIPv4=%{DATA:icmpTypeIPv4}" }

}
if [logtype] == "flowIdlog" {

aggregate {
  task_id => "%{flowId}"
  code => "map['appIdStr'] = event['appIdStr']"
}

}
else if [logtype] == "accesslog" {
aggregate {
task_id => "%{flowId}"
code => "event['appIdStr'] = map['appIdStr']"
end_of_task => true
timeout => 120
}
}
}

We can see the groks passing it correctly, but unable to verify if the aggregate is working or not? Need expert advise to solve our needs.


(Steelerz) #2

Can someone guide us on this?


(system) #3

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.