heinrich
(Hein)
December 22, 2019, 4:46pm
1
Hi all,
I'm struggling parsing sophos utm firewall logs to extract data from the message field from filebeat. I'm new to grok and kv filters and require some help please. After some digging i have come across suggestions but can't seem to get it to work.
Example of the log entry
2019:12:22-18:14:36 sophosutm9 ulogd[4860]: id="2002" severity="info" sys="SecureNet" sub="packetfilter" name="Packet accepted" action="accept" fwrule="4" initf="eth5" outitf="ppp0" srcmac="f8:95:ea:3a:a2:89" dstmac="52:54:00:46:92:70" srcip="192.168.0.107" dstip="17.253.18.125" proto="17" length="76" tos="0x00" prec="0x00" ttl="63" srcport="53593" dstport="123"
Example of my logstash config file:
input {
beats {
port => 5044
}
}
OUTPUT SECTION
This section defines the storage for the logs to be stored.
output {
elasticsearch {
hosts => ["http://172.16.1.2:9200 "]
manage_template => false
index => "%{[@metadata ][beat]}-%{[@metadata ][version]}-%{+YYYY.MM.DD}"
}
}
Any help would be appreciated.
Badger
December 23, 2019, 2:58pm
2
I would dissect and then kv
dissect { mapping => { "message" => "%{[@metadata][timestamp]} %{sourceHost} %{processName}[%{processId}]: %{[@metadata][restOfLine]}" } }
date { match => [ "[@metadata][timestamp]", "YYYY:MM:ss-HH:mm:ss" ] }
kv { source => "[@metadata][restOfLine]" }
heinrich
(Hein)
December 23, 2019, 4:30pm
3
Thanks @Badger , i tried that exact mapping however i'm starting to wonder if this isn't in json format.
{
"_index": "filebeat-7.5.1-2019.12.357",
"_type": "_doc",
"_id": "lLqNM28B0je07S_kiFyx",
"_version": 1,
"_score": null,
"_source": {
"tags": [
"beats_input_codec_plain_applied",
"_jsonparsefailure"
],
"@version ": "1",
"message": "2019:12:23-18:18:28 sophosutm9 ulogd[4860]: id="2001" severity="info" sys="SecureNet" sub="packetfilter" name="Packet dropped" action="drop" fwrule="60001" initf="ppp0" srcip="185.153.197.162" dstip="197.245.81.138" proto="6" length="40" tos="0x08" prec="0x00" ttl="236" srcport="53864" dstport="15555" tcpflags="SYN" ",
"log": {
"offset": 11235224,
"file": {
"path": "/var/log/packetfilter.log"
}
},
"input": {
"type": "log"
},
"level": "%{[srcip]}",
"host": {
"name": "sophosutm9"
},
"ecs": {
"version": "1.1.0"
},
"agent": {
"id": "362b6293-6d6f-4905-861e-2d5f681b4a5a",
"type": "filebeat",
"hostname": "sophosutm9",
"version": "7.5.1",
"ephemeral_id": "577f2fe9-4a90-4f1c-b882-9436684f24c5"
},
"@timestamp ": "2019-12-23T16:18:30.104Z"
},
"fields": {
"@timestamp ": [
"2019-12-23T16:18:30.104Z"
]
},
"sort": [
1577117910104
]
}
I came as far as:
input {
beats {
port => 5044
}
}
filter {
json {
source => "message"
}
mutate {add_field => {"sourceip" => "%{[srcip]}"}}
}
output {
stdout {
codec => rubydebug { metadata => true }
}
elasticsearch {
hosts => ["http://172.16.1.2:9200 "]
manage_template => false
index => "%{[@metadata ][beat]}-%{[@metadata ][version]}-%{+YYYY.MM.DD}"
}
}
but this is what is looks like in kibana: sourceip %{[srcip]}
It doesn't actually show the content of it srcip="185.153.197.162"
Badger
December 23, 2019, 4:35pm
4
[message] is not JSON, so a json filter will not parse it.
heinrich
(Hein)
December 23, 2019, 5:36pm
5
@Badger Would i use your suggestion as is?
input {
beats {
port => 5044
}
}
filter {
dissect { mapping => { "message" => "%{[@metadata ][timestamp]} %{sourceHost} %{processName}[%{processId}]: %{[@metadata ][restOfLine]}" } }
date { match => [ "[@metadata ][timestamp]", "YYYY:MM:ss-HH:mm:ss" ] }
kv { source => "[@metadata ][restOfLine]" }
}
output {
elasticsearch {
hosts => ["http://172.16.1.2:9200 "]
index => "%{[@metadata ][beat]}-%{[@metadata ][version]}-%{+YYYY.MM.DD}"
}
}
Thanks for the help man, i do appreciate it. I'm new to grok filters, dissect etc.
heinrich
(Hein)
December 23, 2019, 9:45pm
7
@Badger the weird part of this is, i used it exactly as is. This didn't work at first. I then removed and recreated the .conf file. Pasted the exact reply with the config
input {
beats {
port => 5044
}
}
filter {
dissect { mapping => { "message" => "%{[@metadata ][timestamp]} %{sourceHost} %{processName}[%{processId}]: %{[@metadata ][restOfLine]}" } }
date { match => [ "[@metadata ][timestamp]", "YYYY:MM:ss-HH:mm:ss" ] }
kv { source => "[@metadata ][restOfLine]" }
}
output {
elasticsearch {
hosts => ["http://172.16.1.2:9200 "]
index => "%{[@metadata ][beat]}-%{[@metadata ][version]}-%{+YYYY.MM.DD}"
}
}
I removed everything relating to logstash, index patterns etc. I restarted filebeat and logstash and would you know it, this works!!!!
Mate, thank you so much for the help. Your solution finally got this working for me.