heinrich
(Hein)
December 22, 2019, 4:46pm
1
Hi all,
I'm struggling parsing sophos utm firewall logs to extract data from the message field from filebeat. I'm new to grok and kv filters and require some help please. After some digging i have come across suggestions but can't seem to get it to work.
Example of the log entry
2019:12:22-18:14:36 sophosutm9 ulogd[4860]: id="2002" severity="info" sys="SecureNet" sub="packetfilter" name="Packet accepted" action="accept" fwrule="4" initf="eth5" outitf="ppp0" srcmac="f8:95:ea:3a:a2:89" dstmac="52:54:00:46:92:70" srcip="192.168.0.107" dstip="17.253.18.125" proto="17" length="76" tos="0x00" prec="0x00" ttl="63" srcport="53593" dstport="123"
Example of my logstash config file:
input {
beats {
port => 5044
}
}
OUTPUT SECTION
This section defines the storage for the logs to be stored.
output {
elasticsearch {
hosts => ["http://172.16.1.2:9200 "]
manage_template => false
index => "%{[@metadata ][beat]}-%{[@metadata ][version]}-%{+YYYY.MM.DD}"
}
}
Any help would be appreciated.
Badger
December 23, 2019, 2:58pm
2
I would dissect and then kv
dissect { mapping => { "message" => "%{[@metadata][timestamp]} %{sourceHost} %{processName}[%{processId}]: %{[@metadata][restOfLine]}" } }
date { match => [ "[@metadata][timestamp]", "YYYY:MM:ss-HH:mm:ss" ] }
kv { source => "[@metadata][restOfLine]" }
1 Like
heinrich
(Hein)
December 23, 2019, 4:30pm
3
Thanks @Badger , i tried that exact mapping however i'm starting to wonder if this isn't in json format.
{
"_index": "filebeat-7.5.1-2019.12.357",
"_type": "_doc",
"_id": "lLqNM28B0je07S_kiFyx",
"_version": 1,
"_score": null,
"_source": {
"tags": [
"beats_input_codec_plain_applied",
"_jsonparsefailure"
],
"@version ": "1",
"message": "2019:12:23-18:18:28 sophosutm9 ulogd[4860]: id="2001" severity="info" sys="SecureNet" sub="packetfilter" name="Packet dropped" action="drop" fwrule="60001" initf="ppp0" srcip="185.153.197.162" dstip="197.245.81.138" proto="6" length="40" tos="0x08" prec="0x00" ttl="236" srcport="53864" dstport="15555" tcpflags="SYN" ",
"log": {
"offset": 11235224,
"file": {
"path": "/var/log/packetfilter.log"
}
},
"input": {
"type": "log"
},
"level": "%{[srcip]}",
"host": {
"name": "sophosutm9"
},
"ecs": {
"version": "1.1.0"
},
"agent": {
"id": "362b6293-6d6f-4905-861e-2d5f681b4a5a",
"type": "filebeat",
"hostname": "sophosutm9",
"version": "7.5.1",
"ephemeral_id": "577f2fe9-4a90-4f1c-b882-9436684f24c5"
},
"@timestamp ": "2019-12-23T16:18:30.104Z"
},
"fields": {
"@timestamp ": [
"2019-12-23T16:18:30.104Z"
]
},
"sort": [
1577117910104
]
}
I came as far as:
input {
beats {
port => 5044
}
}
filter {
json {
source => "message"
}
mutate {add_field => {"sourceip" => "%{[srcip]}"}}
}
output {
stdout {
codec => rubydebug { metadata => true }
}
elasticsearch {
hosts => ["http://172.16.1.2:9200 "]
manage_template => false
index => "%{[@metadata ][beat]}-%{[@metadata ][version]}-%{+YYYY.MM.DD}"
}
}
but this is what is looks like in kibana: sourceip %{[srcip]}
It doesn't actually show the content of it srcip="185.153.197.162"
Badger
December 23, 2019, 4:35pm
4
[message] is not JSON, so a json filter will not parse it.
heinrich
(Hein)
December 23, 2019, 5:36pm
5
@Badger Would i use your suggestion as is?
input {
beats {
port => 5044
}
}
filter {
dissect { mapping => { "message" => "%{[@metadata ][timestamp]} %{sourceHost} %{processName}[%{processId}]: %{[@metadata ][restOfLine]}" } }
date { match => [ "[@metadata ][timestamp]", "YYYY:MM:ss-HH:mm:ss" ] }
kv { source => "[@metadata ][restOfLine]" }
}
output {
elasticsearch {
hosts => ["http://172.16.1.2:9200 "]
index => "%{[@metadata ][beat]}-%{[@metadata ][version]}-%{+YYYY.MM.DD}"
}
}
Thanks for the help man, i do appreciate it. I'm new to grok filters, dissect etc.
heinrich
(Hein)
December 23, 2019, 9:45pm
7
@Badger the weird part of this is, i used it exactly as is. This didn't work at first. I then removed and recreated the .conf file. Pasted the exact reply with the config
input {
beats {
port => 5044
}
}
filter {
dissect { mapping => { "message" => "%{[@metadata ][timestamp]} %{sourceHost} %{processName}[%{processId}]: %{[@metadata ][restOfLine]}" } }
date { match => [ "[@metadata ][timestamp]", "YYYY:MM:ss-HH:mm:ss" ] }
kv { source => "[@metadata ][restOfLine]" }
}
output {
elasticsearch {
hosts => ["http://172.16.1.2:9200 "]
index => "%{[@metadata ][beat]}-%{[@metadata ][version]}-%{+YYYY.MM.DD}"
}
}
I removed everything relating to logstash, index patterns etc. I restarted filebeat and logstash and would you know it, this works!!!!
Mate, thank you so much for the help. Your solution finally got this working for me.
system
(system)
Closed
January 20, 2020, 9:45pm
8
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.