Parsing firewall logs in logstash

Hi all,

I'm struggling parsing sophos utm firewall logs to extract data from the message field from filebeat. I'm new to grok and kv filters and require some help please. After some digging i have come across suggestions but can't seem to get it to work.

Example of the log entry

2019:12:22-18:14:36 sophosutm9 ulogd[4860]: id="2002" severity="info" sys="SecureNet" sub="packetfilter" name="Packet accepted" action="accept" fwrule="4" initf="eth5" outitf="ppp0" srcmac="f8:95:ea:3a:a2:89" dstmac="52:54:00:46:92:70" srcip="192.168.0.107" dstip="17.253.18.125" proto="17" length="76" tos="0x00" prec="0x00" ttl="63" srcport="53593" dstport="123"

Example of my logstash config file:

input {
beats {
port => 5044
}
}

OUTPUT SECTION

This section defines the storage for the logs to be stored.

output {
elasticsearch {
hosts => ["http://172.16.1.2:9200"]
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.DD}"
}
}

Any help would be appreciated.

I would dissect and then kv

    dissect { mapping => { "message" => "%{[@metadata][timestamp]} %{sourceHost} %{processName}[%{processId}]: %{[@metadata][restOfLine]}" } }
    date { match => [ "[@metadata][timestamp]", "YYYY:MM:ss-HH:mm:ss" ] }
    kv { source => "[@metadata][restOfLine]" }
1 Like

Thanks @Badger, i tried that exact mapping however i'm starting to wonder if this isn't in json format.

{
"_index": "filebeat-7.5.1-2019.12.357",
"_type": "_doc",
"_id": "lLqNM28B0je07S_kiFyx",
"_version": 1,
"_score": null,
"_source": {
"tags": [
"beats_input_codec_plain_applied",
"_jsonparsefailure"
],
"@version": "1",
"message": "2019:12:23-18:18:28 sophosutm9 ulogd[4860]: id="2001" severity="info" sys="SecureNet" sub="packetfilter" name="Packet dropped" action="drop" fwrule="60001" initf="ppp0" srcip="185.153.197.162" dstip="197.245.81.138" proto="6" length="40" tos="0x08" prec="0x00" ttl="236" srcport="53864" dstport="15555" tcpflags="SYN" ",
"log": {
"offset": 11235224,
"file": {
"path": "/var/log/packetfilter.log"
}
},
"input": {
"type": "log"
},
"level": "%{[srcip]}",
"host": {
"name": "sophosutm9"
},
"ecs": {
"version": "1.1.0"
},
"agent": {
"id": "362b6293-6d6f-4905-861e-2d5f681b4a5a",
"type": "filebeat",
"hostname": "sophosutm9",
"version": "7.5.1",
"ephemeral_id": "577f2fe9-4a90-4f1c-b882-9436684f24c5"
},
"@timestamp": "2019-12-23T16:18:30.104Z"
},
"fields": {
"@timestamp": [
"2019-12-23T16:18:30.104Z"
]
},
"sort": [
1577117910104
]
}

I came as far as:

input {
beats {
port => 5044
}
}

filter {
json {
source => "message"
}
mutate {add_field => {"sourceip" => "%{[srcip]}"}}
}

output {

stdout {
codec => rubydebug { metadata => true }
}
elasticsearch {
hosts => ["http://172.16.1.2:9200"]
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.DD}"
}
}

but this is what is looks like in kibana: sourceip %{[srcip]}

It doesn't actually show the content of it srcip="185.153.197.162"

[message] is not JSON, so a json filter will not parse it.

@Badger Would i use your suggestion as is?

input {
beats {
port => 5044
}
}

filter {
dissect { mapping => { "message" => "%{[@metadata][timestamp]} %{sourceHost} %{processName}[%{processId}]: %{[@metadata][restOfLine]}" } }
date { match => [ "[@metadata][timestamp]", "YYYY:MM:ss-HH:mm:ss" ] }
kv { source => "[@metadata][restOfLine]" }
}

output {
elasticsearch {
hosts => ["http://172.16.1.2:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.DD}"
}
}

Thanks for the help man, i do appreciate it. I'm new to grok filters, dissect etc.

Yes, that looks right.

@Badger the weird part of this is, i used it exactly as is. This didn't work at first. I then removed and recreated the .conf file. Pasted the exact reply with the config

input {
beats {
port => 5044
}
}

filter {
dissect { mapping => { "message" => "%{[@metadata][timestamp]} %{sourceHost} %{processName}[%{processId}]: %{[@metadata][restOfLine]}" } }
date { match => [ "[@metadata][timestamp]", "YYYY:MM:ss-HH:mm:ss" ] }
kv { source => "[@metadata][restOfLine]" }
}

output {
elasticsearch {
hosts => ["http://172.16.1.2:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.DD}"
}
}

I removed everything relating to logstash, index patterns etc. I restarted filebeat and logstash and would you know it, this works!!!!

Mate, thank you so much for the help. Your solution finally got this working for me.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.