Completely lost

Hello all. I have followed the excellent Digital Ocean filebeat/logstash/ES/Kibana (How To Install Elasticsearch, Logstash, and Kibana (ELK Stack) on CentOS 7 | DigitalOcean). I am getting messages from one of my files into elasticsearch/kibana.

However, my input lines look like the following:

Dec 19 17:54:49 myserver.example.com kernel: [12954239.200000] DROP IN=eth1 OUT= MAC=30:46:9a:15:66:b0:e8:b7:48:0c:8a:da:08:00 SRC=178.175.38.24 DST=10.1.1.1 LEN=60 TOS=0x00 PREC=0x00 TTL=47 ID=12841 DF PROTO=TCP SPT=50806 DPT=20012 SEQ=1057317385 ACK=0 WINDOW=5808 R

My current filter is

 filter {
   if [type] == "syslog" {
     grok {
       match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
       add_field => [ "received_at", "%{@timestamp}" ]
       add_field => [ "received_from", "%{host}" ]
     }
     syslog_pri { }
     date {
       match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
     }
   }
 }

I'm trying to parse out the "SRC" ip, "DST" ip into separate fields. I would also like to use the geoip plugin.

I read through a bunch of documentation and I am completely lost at how I might accomplish this task.

Thanks in advance!

My grok lines:
`match => [ "message", "%{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{WORD:kernel}: [%{DATA:unixtime}] IN=%{DATA:in_int} OUT=%{DATA:out_int} MAC=%{DATA:junk} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{NUMBER:len} TOS=%{DATA:junk} PREC=%{DATA:junk} TTL=%{NUMBER:junk} ID=%{NUMBER:junk} DF PROTO=%{WORD:proto} SPT=%{NUMBER:src_port} DPT=%{NUMBER:dst_port} WINDOW=%{NUMBER:junk} RES=%{DATA:junk} %{DATA:flags} URGP=%{NUMBER:junk}" ]

match => [ "message", "%{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{WORD:kernel}: [%{DATA:unixtime}] IN=%{DATA:in_int} OUT=%{DATA:out_int} MAC=%{DATA:junk} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{NUMBER:len} TOS=%{DATA:junk} PREC=%{DATA:junk} TTL=%{NUMBER:junk} ID=%{NUMBER:junk} PROTO=%{WORD:proto} SPT=%{NUMBER:src_port} DPT=%{NUMBER:dst_port} WINDOW=%{NUMBER:junk} RES=%{DATA:junk} %{DATA:flags} URGP=%{NUMBER:junk}" ]

match => [ "message", "%{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{WORD:kernel}: [%{DATA:unixtime}] IN=%{DATA:in_int} OUT=%{DATA:out_int} MAC=%{DATA:junk} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{POSINT:fullen} TOS=%{DATA:junk} PREC=%{DATA:junk} TTL=%{POSINT:junk} ID=%{NUMBER:junk} DF PROTO=%{WORD:proto} SPT=%{POSINT:src_port} DPT=%{POSINT:dst_port} LEN=%{POSINT:len}" ]

match => [ "message", "%{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{WORD:kernel}: [%{DATA:unixtime}] IN=%{DATA:in_int} OUT=%{DATA:out_int} MAC=%{DATA:junk} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{POSINT:fullen} TOS=%{DATA:junk} PREC=%{DATA:junk} TTL=%{POSINT:junk} ID=%{NUMBER:junk} PROTO=%{WORD:proto} SPT=%{POSINT:src_port} DPT=%{POSINT:dst_port} LEN=%{POSINT:len} MARK=%{DATA:junk}" ]

match => [ "message", "%{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{WORD:kernel}: [%{DATA:unixtime}] IN=%{DATA:in_int} OUT=%{DATA:out_int} MAC=%{DATA:junk} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{POSINT:fullen} TOS=%{DATA:junk} PREC=%{DATA:junk} TTL=%{POSINT:junk} ID=%{NUMBER:junk} PROTO=%{WORD:proto} SPT=%{POSINT:src_port} DPT=%{POSINT:dst_port} LEN=%{POSINT:len}" ]`

Season to taste.

Thank you - that helped tremendously!

One followup question - mixed in with these events, I occasionally get ssh login failures. For example:

Dec 20 13:22:21 router01.example.com dropbear[31382]: bad password attempt for 'root' from 182.75.33.126:61332

I am able to parse these with:

%{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{WORD:kernel}[%{DATA:unix_pid}]: bad password attempt for 'root' from %{IP:src_ip}:%{NUMBER:src_port}

I would love to graph these separately from the other type of log entries. Is there a way to "mutate" these to another "type" so I can process these independently?

Nevermind - I figured it out with some googling and trial and error. Thanks again!

Ah good deal...busy day here..sorry about no response.

If the issue is with grokking, I recommend you book marked this tool http://grokdebug.herokuapp.com/
Otherwise the plugin documentation is your friend in this case.

https://www.elastic.co/guide/en/logstash/current/input-plugins.html
https://www.elastic.co/guide/en/logstash/current/output-plugins.html
https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

BTW, event type value once set in logstash cannot be changed within logstash. You have to do so externally.