Hello all. I have followed the excellent Digital Ocean filebeat/logstash/ES/Kibana (How To Install Elasticsearch, Logstash, and Kibana (ELK Stack) on CentOS 7 | DigitalOcean ). I am getting messages from one of my files into elasticsearch/kibana.
However, my input lines look like the following:
Dec 19 17:54:49 myserver.example.com kernel: [12954239.200000] DROP IN=eth1 OUT= MAC=30:46:9a:15:66:b0:e8:b7:48:0c:8a:da:08:00 SRC=178.175.38.24 DST=10.1.1.1 LEN=60 TOS=0x00 PREC=0x00 TTL=47 ID=12841 DF PROTO=TCP SPT=50806 DPT=20012 SEQ=1057317385 ACK=0 WINDOW=5808 R
My current filter is
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
I'm trying to parse out the "SRC" ip, "DST" ip into separate fields. I would also like to use the geoip plugin.
I read through a bunch of documentation and I am completely lost at how I might accomplish this task.
Thanks in advance!
My grok lines:
`match => [ "message", "%{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{WORD:kernel}: [%{DATA:unixtime}] IN=%{DATA:in_int} OUT=%{DATA:out_int} MAC=%{DATA:junk} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{NUMBER:len} TOS=%{DATA:junk} PREC=%{DATA:junk} TTL=%{NUMBER:junk} ID=%{NUMBER:junk} DF PROTO=%{WORD:proto} SPT=%{NUMBER:src_port} DPT=%{NUMBER:dst_port} WINDOW=%{NUMBER:junk} RES=%{DATA:junk} %{DATA:flags} URGP=%{NUMBER:junk}" ]
match => [ "message", "%{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{WORD:kernel}: [%{DATA:unixtime}] IN=%{DATA:in_int} OUT=%{DATA:out_int} MAC=%{DATA:junk} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{NUMBER:len} TOS=%{DATA:junk} PREC=%{DATA:junk} TTL=%{NUMBER:junk} ID=%{NUMBER:junk} PROTO=%{WORD:proto} SPT=%{NUMBER:src_port} DPT=%{NUMBER:dst_port} WINDOW=%{NUMBER:junk} RES=%{DATA:junk} %{DATA:flags} URGP=%{NUMBER:junk}" ]
match => [ "message", "%{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{WORD:kernel}: [%{DATA:unixtime}] IN=%{DATA:in_int} OUT=%{DATA:out_int} MAC=%{DATA:junk} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{POSINT:fullen} TOS=%{DATA:junk} PREC=%{DATA:junk} TTL=%{POSINT:junk} ID=%{NUMBER:junk} DF PROTO=%{WORD:proto} SPT=%{POSINT:src_port} DPT=%{POSINT:dst_port} LEN=%{POSINT:len}" ]
match => [ "message", "%{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{WORD:kernel}: [%{DATA:unixtime}] IN=%{DATA:in_int} OUT=%{DATA:out_int} MAC=%{DATA:junk} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{POSINT:fullen} TOS=%{DATA:junk} PREC=%{DATA:junk} TTL=%{POSINT:junk} ID=%{NUMBER:junk} PROTO=%{WORD:proto} SPT=%{POSINT:src_port} DPT=%{POSINT:dst_port} LEN=%{POSINT:len} MARK=%{DATA:junk}" ]
match => [ "message", "%{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{WORD:kernel}: [%{DATA:unixtime}] IN=%{DATA:in_int} OUT=%{DATA:out_int} MAC=%{DATA:junk} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{POSINT:fullen} TOS=%{DATA:junk} PREC=%{DATA:junk} TTL=%{POSINT:junk} ID=%{NUMBER:junk} PROTO=%{WORD:proto} SPT=%{POSINT:src_port} DPT=%{POSINT:dst_port} LEN=%{POSINT:len}" ]`
Season to taste.
Thank you - that helped tremendously!
One followup question - mixed in with these events, I occasionally get ssh login failures. For example:
Dec 20 13:22:21 router01.example.com dropbear[31382]: bad password attempt for 'root' from 182.75.33.126:61332
I am able to parse these with:
%{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{WORD:kernel}[%{DATA:unix_pid}]: bad password attempt for 'root' from %{IP:src_ip}:%{NUMBER:src_port}
I would love to graph these separately from the other type of log entries. Is there a way to "mutate" these to another "type" so I can process these independently?
Nevermind - I figured it out with some googling and trial and error. Thanks again!
Ah good deal...busy day here..sorry about no response.
James_Tang
(James Tang)
December 21, 2015, 2:38am
6
If the issue is with grokking, I recommend you book marked this tool http://grokdebug.herokuapp.com/
Otherwise the plugin documentation is your friend in this case.
https://www.elastic.co/guide/en/logstash/current/input-plugins.html
https://www.elastic.co/guide/en/logstash/current/output-plugins.html
https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
BTW, event type value once set in logstash cannot be changed within logstash. You have to do so externally.