heinrich  
                (Hein)
               
                 
              
                  
                    December 22, 2019,  4:46pm
                   
                   
              1 
               
             
            
              Hi all,
I'm struggling parsing sophos utm firewall logs to extract data from the message field from filebeat. I'm new to grok and kv filters and require some help please. After some digging i have come across suggestions but can't seem to get it to work.
Example of the log entry
2019:12:22-18:14:36 sophosutm9 ulogd[4860]: id="2002" severity="info" sys="SecureNet" sub="packetfilter" name="Packet accepted" action="accept" fwrule="4" initf="eth5" outitf="ppp0" srcmac="f8:95:ea:3a:a2:89" dstmac="52:54:00:46:92:70" srcip="192.168.0.107" dstip="17.253.18.125" proto="17" length="76" tos="0x00" prec="0x00" ttl="63" srcport="53593" dstport="123"
Example of my logstash config file:
input { 
beats { 
port => 5044 
} 
}
OUTPUT SECTION 
This section defines the storage for the logs to be stored. 
output { 
elasticsearch { 
hosts => ["http://172.16.1.2:9200 "] 
manage_template => false 
index => "%{[@metadata ][beat]}-%{[@metadata ][version]}-%{+YYYY.MM.DD}" 
} 
}
Any help would be appreciated.
             
            
               
               
               
            
            
           
          
            
              
                Badger  
                
               
              
                  
                    December 23, 2019,  2:58pm
                   
                   
              2 
               
             
            
              I would dissect and then kv
    dissect { mapping => { "message" => "%{[@metadata][timestamp]} %{sourceHost} %{processName}[%{processId}]: %{[@metadata][restOfLine]}" } }
    date { match => [ "[@metadata][timestamp]", "YYYY:MM:ss-HH:mm:ss" ] }
    kv { source => "[@metadata][restOfLine]" } 
             
            
               
               
              1 Like 
            
                
            
           
          
            
              
                heinrich  
                (Hein)
               
              
                  
                    December 23, 2019,  4:30pm
                   
                   
              3 
               
             
            
              Thanks @Badger , i tried that exact mapping however i'm starting to wonder if this isn't in json format.
{ 
"_index": "filebeat-7.5.1-2019.12.357", 
"_type": "_doc", 
"_id": "lLqNM28B0je07S_kiFyx", 
"_version": 1, 
"_score": null, 
"_source": { 
"tags": [ 
"beats_input_codec_plain_applied", 
"_jsonparsefailure" 
], 
"@version ": "1", 
"message": "2019:12:23-18:18:28 sophosutm9 ulogd[4860]: id="2001" severity="info" sys="SecureNet" sub="packetfilter" name="Packet dropped" action="drop" fwrule="60001" initf="ppp0" srcip="185.153.197.162" dstip="197.245.81.138" proto="6" length="40" tos="0x08" prec="0x00" ttl="236" srcport="53864" dstport="15555" tcpflags="SYN" ", 
"log": { 
"offset": 11235224, 
"file": { 
"path": "/var/log/packetfilter.log" 
} 
}, 
"input": { 
"type": "log" 
}, 
"level": "%{[srcip]}", 
"host": { 
"name": "sophosutm9" 
}, 
"ecs": { 
"version": "1.1.0" 
}, 
"agent": { 
"id": "362b6293-6d6f-4905-861e-2d5f681b4a5a", 
"type": "filebeat", 
"hostname": "sophosutm9", 
"version": "7.5.1", 
"ephemeral_id": "577f2fe9-4a90-4f1c-b882-9436684f24c5" 
}, 
"@timestamp ": "2019-12-23T16:18:30.104Z" 
}, 
"fields": { 
"@timestamp ": [ 
"2019-12-23T16:18:30.104Z" 
] 
}, 
"sort": [ 
1577117910104 
] 
}
 
I came as far as:
input { 
beats { 
port => 5044 
} 
}
filter { 
json { 
source => "message" 
} 
mutate {add_field => {"sourceip" => "%{[srcip]}"}} 
}
output {
stdout { 
codec => rubydebug { metadata => true } 
} 
elasticsearch { 
hosts => ["http://172.16.1.2:9200 "] 
manage_template => false 
index => "%{[@metadata ][beat]}-%{[@metadata ][version]}-%{+YYYY.MM.DD}" 
} 
}
 
but this is what is looks like in kibana: sourceip %{[srcip]}
It doesn't actually show the content of it srcip="185.153.197.162"
             
            
               
               
               
            
            
           
          
            
              
                Badger  
                
               
              
                  
                    December 23, 2019,  4:35pm
                   
                   
              4 
               
             
            
              [message] is not JSON, so a json filter will not parse it.
             
            
               
               
               
            
            
           
          
            
              
                heinrich  
                (Hein)
               
              
                  
                    December 23, 2019,  5:36pm
                   
                   
              5 
               
             
            
              @Badger  Would i use your suggestion as is?
input { 
beats { 
port => 5044 
} 
}
filter { 
dissect { mapping => { "message" => "%{[@metadata ][timestamp]} %{sourceHost} %{processName}[%{processId}]: %{[@metadata ][restOfLine]}" } } 
date { match => [ "[@metadata ][timestamp]", "YYYY:MM:ss-HH:mm:ss" ] } 
kv { source => "[@metadata ][restOfLine]" } 
}
output { 
elasticsearch { 
hosts => ["http://172.16.1.2:9200 "] 
index => "%{[@metadata ][beat]}-%{[@metadata ][version]}-%{+YYYY.MM.DD}" 
} 
}
 
Thanks for the help man, i do appreciate it. I'm new to grok filters, dissect etc.
             
            
               
               
               
            
            
           
          
            
              
                heinrich  
                (Hein)
               
              
                  
                    December 23, 2019,  9:45pm
                   
                   
              7 
               
             
            
              @Badger  the weird part of this is, i used it exactly as is. This didn't work at first. I then removed and recreated the .conf file. Pasted the exact reply with the config
input { 
beats { 
port => 5044 
} 
}
filter { 
dissect { mapping => { "message" => "%{[@metadata ][timestamp]} %{sourceHost} %{processName}[%{processId}]: %{[@metadata ][restOfLine]}" } } 
date { match => [ "[@metadata ][timestamp]", "YYYY:MM:ss-HH:mm:ss" ] } 
kv { source => "[@metadata ][restOfLine]" } 
}
output { 
elasticsearch { 
hosts => ["http://172.16.1.2:9200 "] 
index => "%{[@metadata ][beat]}-%{[@metadata ][version]}-%{+YYYY.MM.DD}" 
} 
}
 
I removed everything relating to logstash, index patterns etc. I restarted filebeat and logstash and would you know it, this works!!!!
Mate, thank you so much for the help. Your solution finally got this working for me.
             
            
               
               
               
            
            
           
          
            
              
                system  
                (system)
                  Closed 
               
              
                  
                    January 20, 2020,  9:45pm
                   
                   
              8 
               
             
            
              This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.