Grok parse failure after successful debug


(Freddy Kasprzykowski) #1

I have used http://grokdebug.herokuapp.com/ to create the grok match statements based on tcpdump of the syslog strips the ELK box is receiving. However, when checking Kibana, the logs are not parsed. i have used service logstash configtest and got confirmation the syntax is correct.

(IPs and other info are fake, but strings are intact).

TCPDUMP 1:
15:51:23.173798 IP firewall.local.46932 > elk.local.syslog: SYSLOG local7.notice, length: 316
E..XC'@.@.r=.........T...Dx-<189>Jan 17 15:51:23 2016 firewall src="10.10.1.202:61227" dst="8.8.8.8:53" msg="priority:16, from LAN1 to WAN, UDP, service DNS_UDP, ACCEPT" note="ACCESS FORWARD" user="unknown" devID="yyyyyyy" cat="Firewall" class="Access Control" ob="0" ob_mac="000000000000" dir="LAN1:WAN" protoID=17 proto="DNS_UDP"

SYSLOG STRIP 1: <189>Jan 17 15:51:23 2016 firewall src="10.10.1.202:61227" dst="8.8.8.8:53" msg="priority:16, from LAN1 to WAN, UDP, service DNS_UDP, ACCEPT" note="ACCESS FORWARD" user="unknown" devID="yyyyyyy" cat="Firewall" class="Access Control" ob="0" ob_mac="000000000000" dir="LAN1:WAN" protoID=17 proto="DNS_UDP"<%{NUMBER:syslog_index}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{YEAR:YEAR} %{WORD:device} src="%{IP:source_ip}:%{BASE10NUM:source_port}" dst="%{IP:destination_ip}:%{BASE10NUM:destination_port}" msg="%{DATA:log_type}" note="%{DATA:log_note}" user="%{DATA:user}" devID="%{DATA:device_id}" cat="%{DATA:log_category}" class=%{DATA:class} ob=%{DATA:ob} ob_mac=%{DATA:ob_mac} dir="%{DATA:traffic_direction}" protoID=%{DATA:protocol_number} proto="%{DATA:protocol_service}"

GROK STATEMENT 1: <%{NUMBER:syslog_index}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{YEAR:YEAR} %{WORD:device} src="%{IP:source_ip}:%{BASE10NUM:source_port}" dst="%{IP:destination_ip}:%{BASE10NUM:destination_port}" msg="%{DATA:log_type}" note="%{DATA:log_note}" user="%{DATA:user}" devID="%{DATA:device_id}" cat="%{DATA:log_category}" class=%{DATA:class} ob=%{DATA:ob} ob_mac=%{DATA:ob_mac} dir="%{DATA:traffic_direction}" protoID=%{DATA:protocol_number} proto="%{DATA:protocol_service}"

TCPDUMP 2:
15:51:23.295700 IP firewall.local.46932 > elk.local.syslog: SYSLOG local7.info, length: 249
E...C)@.@.r~.........T....2.<190>Jan 17 15:51:23 2016 firewall src="10.10.1.212:38942" dst="8.8.8.8:40020" msg="Traffic Log" note="Traffic Log" user="unknown" devID="yyyyyyy" cat="Traffic Log" duration=300 sent=157 rcvd=49 dir="lan1:wan1" protoID=17 proto="others"

SYSLOG STRIP 2: <190>Jan 17 15:36:05 2016 firewall src="10.10.1.212:38942" dst="8.8.8.8:40007" msg="Traffic Log" note="Traffic Log" user="unknown" devID="yyyyyyy" cat="Traffic Log" duration=300 sent=158 rcvd=49 dir="lan1:wan1" protoID=17 proto="others"

GROK STATEMENT 2: <%{NUMBER:syslog_index}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{YEAR:YEAR} %{WORD:device} src="%{IP:source_ip}:%{BASE10NUM:source_port}" dst="%{IP:destination_ip}:%{BASE10NUM:destination_port}" msg="%{DATA:log_type}" note="%{DATA:log_note}" user="%{DATA:user}" devID="%{DATA:device_id}" cat="%{DATA:log_category}" duration=%{DATA:duration} sent=%{DATA:sent} rcvd=%{DATA:received} dir="%{DATA:traffic_direction}" protoID=%{DATA:protocol_number} proto="%{DATA:protocol_service}"

logstash.conf

input {
syslog {
port => 5514
type => "syslog"
}
}
filter {
if [type] == "syslog" {
grok {
match => ["message" , "<%{NUMBER:syslog_index}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{YEAR:YEAR} %{WORD:device} src="%{IP:source_ip}:%{BASE10NUM:source_port}" dst="%{IP:destination_ip}:%{BASE10NUM:destination_port}" msg="%{DATA:log_type}" note="%{DATA:log_note}" user="%{DATA:user}" devID="%{DATA:device_id}" cat="%{DATA:log_category}" class=%{DATA:class} ob=%{DATA:ob} ob_mac=%{DATA:ob_mac} dir="%{DATA:traffic_direction}" protoID=%{DATA:protocol_number} proto="%{DATA:protocol_service}""]
}
grok {
match => ["message" , "<%{NUMBER:syslog_index}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{YEAR:YEAR} %{WORD:device} src="%{IP:source_ip}:%{BASE10NUM:source_port}" dst="%{IP:destination_ip}:%{BASE10NUM:destination_port}" msg="%{DATA:log_type}" note="%{DATA:log_note}" user="%{DATA:user}" devID="%{DATA:device_id}" cat="%{DATA:log_category}" duration=%{DATA:duration} sent=%{DATA:sent} rcvd=%{DATA:received} dir="%{DATA:traffic_direction}" protoID=%{DATA:protocol_number} proto="%{DATA:protocol_service}""]
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}


(Magnus Bäck) #2

Be systematic. Start with the simplest possible expression:

^<%{NUMBER:syslog_index}>

Does that work? Yes? Continue:

^<%{NUMBER:syslog_index}>%{SYSLOGTIMESTAMP:syslog_timestamp}

Keep on going until it breaks. Use the stdout output for debugging. Ignore Kibana until stdout looks good.

(Shameless plug: You may find https://github.com/magnusbaeck/logstash-filter-verifier useful for testing filters, both for the initial filter implementation and for making sure future changes don't introduce regressions.)


(Freddy Kasprzykowski) #3

Thank you so much. By looking at /var/log/logstash/logstash.stdout I learned that the message had the syslog number and the time stamp removed. I have adjusted the filter and all attributes are parsing just fine. One last thing though, why is it inserting _grokparsefailure tag still?

filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{WORD:host_name} src="%{IP:source_ip}:%{BASE10NUM:source_port}" dst="%{IP:destination_ip}:%{BASE10NUM:destination_port}" msg="%{DATA:log_type}" note="%{DATA:log_note}" user="%{DATA:user}" devID="%{DATA:device_id}" cat="%{DATA:log_category}" duration=%{DATA:duration} sent=%{DATA:sent} rcvd=%{DATA:received} dir="%{DATA:traffic_direction}" protoID=%{DATA:protocol_number} proto="%{DATA:protocol_service}"" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host_name}" ]
}
grok {
match => { "message" => "%{WORD:host_name} src="%{IP:source_ip}:%{BASE10NUM:source_port}" dst="%{IP:destination_ip}:%{BASE10NUM:destination_port}" msg="%{DATA:log_type}" note="%{DATA:log_note}" user="%{DATA:user}" devID="%{DATA:device_id}" cat="%{DATA:log_category}" class=%{DATA:class} ob=%{DATA:ob} ob_mac=%{DATA:ob_mac} dir="%{DATA:traffic_direction}" protoID=%{DATA:protocol_number} proto="%{DATA:protocol_service}"" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host_name}" ]
}

syslog_pri { }
date {
  match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
}

}
}


(Magnus Bäck) #4

One of the grok expressions is successful and the other one fails. To try multiple grok expressions and break after the first match, follow this pattern:

grok {
  match => {
    "message" => [
      "%{WORD:host_name} ... class=%{DATA:class} ob=%{DATA:ob} ...",
      "%{WORD:host_name} ... duration=%{DATA:duration} sent=%{DATA:sent} ..."
    ]
  }
}

(Freddy Kasprzykowski) #5

And... it works!

I checked the documentation and it is correct: https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html

I am not sure why there are several posts lingering around claiming the syntax is:
match => [ "message", format1,
"message", format2 ]

Thank you very much. If there is anything I can do to help the community, let me know.

Best Regards


(Magnus Bäck) #6

I am not sure why there are several posts lingering around claiming the syntax is:
match => [ "message", format1,
"message", format2 ]

Both are acceptable;

match => ["field", "pattern1", ..., "patternN"]

is equivalent to

match => { "field" => ["pattern1", ..., "patternn"] }

(system) #7