joshn
(Josh)
April 29, 2022, 3:03pm
1
I'm seeing pretty much all of my logs in Elastic/Kibana tagged with the following tags:
_grokparsefailure_sysloginput, _grokparsefailure, _geoip_lookup_failure
Which is odd, as they are being processed just fine.
Example log message, which has all the custom fields I wanted, clearly processed, but I get this error: (N.B. I edited out my IP with xx.xx).
After some research, I believe it's due to my inputs and how I'm using grok.
syslog {
port => 5530
type => syslog_meraki_sa_ids
}
This is the input for JUST my flows, I use different ports/types for events as an example.
Could this be confusing logstash, and it doesn't think it's parsed it, but actually it has?
Rios
(Rios)
April 29, 2022, 7:28pm
2
Are you using ECS?
_grokparsefailure - How many grok checks do you have, is it only on the message?
_geoip_lookup_failure - I don't thing geo_db is too old, it's another tag. Can you check http://localhost:9600/_node/stats/geoip_download_manager?pretty ?
Also would recommend to set index template for geoip fields.
Badger
April 29, 2022, 7:41pm
3
That is added by the syslog input when its grok pattern does not match the line.
A grok filter with multiple matches sometimes adds a _grokparsefailure tag even when most patterns matched.
You would need to show us details of the grok filter and the event for us to say more.
leandrojmp
(Leandro Pereira)
April 29, 2022, 7:48pm
4
Please, share your full logstash pipeline, it is not possible to understand why you have those tags without seeing your pipeline.
Also share an example of your messages.
The _grokparsefailure_sysloginput
is added by the syslog
input, if you are getting this tag this mean that the syslog input is not able to parse your message.
If your original message has the format from the message_raw
field you shared in your screenshot, that message is not a valid syslog message that would be parsed by the syslog input, probably because the time field which is in epoch.
The _grokparsefailure
comes from some grok
filter in your pipeline, you didn't share your pipeline so it is not possible to know what is causing it.
The _geoip_lookup_failure
comes from a geoip
filter in your pipeline, from your screenshot it seems that it correctly applied a geoip filter on the 8.8.8.8
ip address, which seems to be your dst_ip
, but if you have another geoip filter on your src_ip
, it won't work and will give this error because your src_ip
is a private ip address.
2 Likes
joshn
(Josh)
May 1, 2022, 12:42pm
5
Thanks Badger - here is my grok filter, which matches on that example:
# <134>1 1651225979.448642514 EMEA_ISP ip_flow_end src=192.168.15.6 dst=8.8.4.4 protocol=udp sport=43026 dport=53 translated_src_ip=1.1.1.1 translated_port=4302
match => { "message" => "ip_flow_end src=%{IP:src_ip} dst=%{IP:dst_ip} protocol=%{WORD:protocol} sport=%{NUMBER:src_port} dport=%{NUMBER:dst_port} translated_src_ip=%{IP:mx_external_src_ip} translated_port=%{NUMBER:mx_external_src_port}" }
Seems like a good/complete match to me?
joshn
(Josh)
May 1, 2022, 1:45pm
6
Thanks leandrojmp - here is my full pipeline below.
I've also got a small snipet here of what is a perfect match log with grok filter:
# <134>1 1651225979.448642514 EMEA_ISP ip_flow_end src=192.168.15.6 dst=8.8.4.4 protocol=udp sport=43026 dport=53 translated_src_ip=1.1.1.1 translated_port=4302
match => { "message" => "ip_flow_end src=%{IP:src_ip} dst=%{IP:dst_ip} protocol=%{WORD:protocol} sport=%{NUMBER:src_port} dport=%{NUMBER:dst_port} translated_src_ip=%{IP:mx_external_src_ip} translated_port=%{NUMBER:mx_external_src_port}" }
input {
syslog {
port => 5500
type => "syslog_meraki_sa_events"
}
syslog {
port => 5510
type => "syslog_meraki_sa_flows"
}
syslog {
port => 5520
type => "syslog_meraki_sa_flows"
}
syslog {
port => 5530
type => "syslog_meraki_sa_ids"
}
}
filter {
if [type] == "syslog_meraki_sa_events" {
grok {
# Custom Pattens
patterns_dir => "/etc/logstash/patterns/" # directory to custom paterns
match => { "message" => "events %{WORD:action} lease of ip %{IP:src_ip} from server mac %{MAC:dst_mac} for client mac %{MAC:src_mac} from router %{IP:dst_ip} on subnet %{IP:scr_ip_sub} with (?=.*?(?:dns %{IP:scr_ip_dns}(?:, %{IP:scr_ip_dns})?(?:, %{IP:scr_ip_dns})?(?:, %{IP:scr_ip_dns})?))" }
match => { "message" => "events %{WORD:action} lease of ip %{IP:client_ip_leased} from mx mac %{MAC:dhcp_server_mac} for client mac %{MAC:dhcp_client_mac} from router %{IP:dhcp_gateway} on subnet %{IP:dhcp_subnet_mask}" }
match => { "message" => "events Site-to-site %{WORD:action}: %{GREEDYDATA}" }
match => { "message" => "events type=%{GREEDYDATA:message} vpn_type='%{GREEDYDATA:message}' peer_contact='%{IP:src_ip}:%{BASE16NUM:src_port}' peer_ident='%{GREEDYDATA}' connectivity='%{GREEDYDATA}'" }
match => { "message" => "events %{WORD:action} on %{MAC:src_mac} for user %{USER:username} as %{GREEDYDATA:user_ldap} with policy for group %{GREEDYDATA:user_group_ldap}" }
match => { "message" => "events %{WORD:action} on %{MAC:src_mac} for user %{USER:username} as %{GREEDYDATA:user_ldap}" }
match => { "message" => "events Site-to-site %{WORD:action}: IPsec-SA established: ESP/Transport %{IP:dst_ip}\[%{NUMBER:dst_port}\]->%{IP:src_ip}\[%{NUMBER:src_port}\] spi=%{NUMBER:spi}" }
match => { "message" => "events %{WORD:action} user id '%{USER:username}' local ip %{IP:dst_ip} connected from %{IP:src_ip}" }
match => { "message" => "events %{GREEDYDATA:message}: %{WORD:action} to connect to server %{IP:dst_ip}" }
match => { "message" => "events %{WORD:action} url='%{MERAKIURI:request}'(?:;)? category0='%{GREEDYDATA}' server='%{IP:dst_ip}:%{NUMBER:dst_port}'" }
# Tags
add_tag => "meraki"
add_tag => "security_applicance"
add_tag => "events"
add_tag => "%{action}"
}
geoip {
add_tag => [ "GeoIP" ]
source => "src_ip"
}
geoip {
add_tag => [ "GeoIP" ]
source => "dst_ip"
}
}
# Meraki Security Applicance ( syslog_meraki_sa_flows )
if [type] == "syslog_meraki_sa_flows" {
grok {
match => { "message" => "<%{POSINT:syslog_pri}>%{INT} %{BASE10NUM:timestamp_unix} %{WORD:host_title} ip_flow_%{GREEDYDATA:action} src=%{IP:src_ip} dst=%{IP:dst_ip} protocol=%{WORD:protocol} translated_src_ip=%{IP:mx_external_src_ip}" }
match => { "message" => "<%{POSINT:syslog_pri}>%{INT} %{BASE10NUM:timestamp_unix} %{WORD:host_title} ip_flow_%{GREEDYDATA:action} src=%{IP:src_ip} dst=%{IP:dst_ip} protocol=%{WORD:protocol} sport=%{NUMBER:src_port} dport=%{NUMBER:dst_port} translated_src_ip=%{IP:mx_external_src_ip} translated_port=%{NUMBER:mx_external_src_port}" }
match => { "message" => "flows src=%{IP:src_ip} dst=%{IP:dst_ip} mac=%{MAC:src_mac} protocol=%{WORD:protocol} sport=%{NUMBER:src_port} dport=%{NUMBER:dst_port} pattern: %{WORD:action} %{GREEDYDATA:request}" }
match => { "message" => "flows src=%{IP:src_ip} dst=%{IP:dst_ip} mac=%{MAC:src_mac} protocol=%{WORD:protocol} type=%{NUMBER:icmp_type} pattern: %{WORD:action}" }
match => { "message" => "flows src=%{IP:src_ip} dst=%{IP:dst_ip} protocol=%{WORD:protocol} sport=%{NUMBER:src_port} dport=%{NUMBER:dst_port} pattern: %{WORD:action}" }
# Tags
add_tag => "meraki"
add_tag => "security_applicance"
add_tag => "flows"
add_tag => "%{action}"
}
geoip {
add_tag => [ "GeoIP" ]
source => "src_ip"
}
geoip {
add_tag => [ "GeoIP" ]
source => "dst_ip"
}
}
# Meraki Security Applicance ( syslog_meraki_sa_url )
if [type] == "syslog_meraki_sa_url" {
grok {
# Custom Pattens
patterns_dir => "/etc/logstash/patterns/" # directory to custom paterns
match => { "message" => "^<%{POSINT:syslog_pri}>%{INT} %{BASE10NUM:timestamp_unix} %{WORD:host_title} urls src=%{IP:src_ip}:%{BASE16NUM:src_port} dst=%{IP:dst_ip}:%{BASE16NUM:dst_port} mac=%{COMMONMAC:src_mac} %{GREEDYDATA:agent} request: %{WORD:request_meth} %{MERAKIURI:request}" }
match => { "message" => "^<%{POSINT:syslog_pri}>%{INT} %{BASE10NUM:timestamp_unix} %{WORD:host_title} urls src=%{IP:src_ip}:%{BASE16NUM:src_port} dst=%{IP:dst_ip}:%{BASE16NUM:dst_port} mac=%{COMMONMAC:src_mac} %{GREEDYDATA} %{MERAKIURI:request}" }
match => { "message" => "^<%{POSINT:syslog_pri}>%{INT} %{BASE10NUM:timestamp_unix} %{WORD:host_title} urls src=%{IP:src_ip}:%{BASE16NUM:src_port} dst=%{IP:dst_ip}:%{BASE16NUM:dst_port} mac=%{COMMONMAC:src_mac} user=%{GREEDYDATA:user_ldap} request: %{WORD:request_meth} %{MERAKIURI:request}" }
# Tags
add_tag => "meraki"
add_tag => "security_applicance"
add_tag => "urls"
}
mutate {
rename => [ "message", "message_raw" ]
replace => [ "message", "%{request}" ]
}
geoip {
add_tag => [ "GeoIP" ]
source => "src_ip"
}
geoip {
add_tag => [ "GeoIP" ]
source => "dst_ip"
}
}
# Meraki Security Applicance ( syslog_meraki_sa_ids )
if [type] == "syslog_meraki_sa_ids" {
grok {
patterns_dir => "/etc/logstash/patterns/" # directory to custom paterns
match => { "message" => "ids-alerts signature=%{GREEDYDATA:ids_sig} priority=%{BASE10NUM:ids_priority} timestamp=%{BASE10NUM:timestamp_unix}(?: shost=%{MAC:src_mac})?(?: dhost=%{MAC:dst_mac})?(?: direction=%{WORD:direction})?(?: protocol=%{DATA:ids_proto}(?:/ip)?)?(?: src=%{IP:src_ip}:%{BASE10NUM:src_port})?(?: dst=%{IP:dst_ip}:%{BASE10NUM:dst_port})? message: %{GREEDYDATA:ids_sig_msg}" }
match => { "message" => "%{GREEDYDATA} url=%{MERAKIURI:request} src=%{IP:src_ip}:%{BASE16NUM:src_port} dst=%{IP:dst_ip}:%{BASE16NUM:dst_port} mac=%{COMMONMAC:src_mac} name=%{GREEDYDATA} sha256=%{GREEDYDATA:SHA256} disposition=%{GREEDYDATA:disposition} action=%{GREEDYDATA:action}" }
# Tags
add_tag => "meraki"
add_tag => "security_applicance"
add_tag => "ids-alerts"
add_tag => "%{action}"
}
geoip {
add_tag => [ "GeoIP" ]
source => "src_ip"
}
geoip {
add_tag => [ "GeoIP" ]
source => "dst_ip"
}
}
}
output {
elasticsearch {
hosts => ["MYSERVERADDRHERE:9200"]
}
}
joshn
(Josh)
May 1, 2022, 1:55pm
7
Here is output of that get request - looks all good and up-to-date for me.
curl -XGET http://localhost:9600/_node/stats/geoip_download_manager?pretty
{
"host" : "EMEA-LS-01",
"version" : "7.17.3",
"http_address" : "127.0.0.1:9600",
"id" : "7bf6b24e-9b40-4992-bd2c-4c6f187d82d3",
"name" : "EMEA-LS-01",
"ephemeral_id" : "08e4fe5e-d0cb-4524-abdc-abfe965013c0",
"status" : "green",
"snapshot" : false,
"pipeline" : {
"workers" : 8,
"batch_size" : 125,
"batch_delay" : 50
},
"geoip_download_manager" : {
"download_stats" : {
"failures" : 0,
"last_checked_at" : "2022-05-01T13:48:25+00:00",
"successes" : 1,
"status" : "succeeded"
},
"database" : {
"ASN" : {
"fail_check_in_days" : 0,
"last_updated_at" : "2022-05-01T13:18:51+00:00",
"status" : "up_to_date"
},
"City" : {
"fail_check_in_days" : 0,
"last_updated_at" : "2022-05-01T13:18:51+00:00",
"status" : "up_to_date"
}
}
}
joshn:
match => { "message" => "events %{WORD:action} lease of ip %{IP:src_ip} from server mac %{MAC:dst_mac} for client mac %{MAC:src_mac} from router %{IP:dst_ip} on subnet %{IP:scr_ip_sub} with (?=.*?(?:dns %{IP:scr_ip_dns}(?:, %{IP:scr_ip_dns})?(?:, %{IP:scr_ip_dns})?(?:, %{IP:scr_ip_dns})?))" }
match => { "message" => "events %{WORD:action} lease of ip %{IP:client_ip_leased} from mx mac %{MAC:dhcp_server_mac} for client mac %{MAC:dhcp_client_mac} from router %{IP:dhcp_gateway} on subnet %{IP:dhcp_subnet_mask}" }
I recommend against using multiple match options. The grok filter will merge them and may not do it in the way you expect. Instead match against an array of patterns as described in the documentation .
match => {
"message" => [
"events %{WORD:action} lease of ip %{IP:src_ip} from server mac %{MAC:dst_mac} for client mac %{MAC:src_mac} from router %{IP:dst_ip} on subnet %{IP:scr_ip_sub} with (?=.*?(?:dns %{IP:scr_ip_dns}(?:, %{IP:scr_ip_dns})?(?:, %{IP:scr_ip_dns})?(?:, %{IP:scr_ip_dns})?))",
"events %{WORD:action} lease of ip %{IP:client_ip_leased} from mx mac %{MAC:dhcp_server_mac} for client mac %{MAC:dhcp_client_mac} from router %{IP:dhcp_gateway} on subnet %{IP:dhcp_subnet_mask}",
"events Site-to-site %{WORD:action}: %{GREEDYDATA}",
...
]
}
Read the issue I linked to to understand why that adds a _grokparsefailure tag. Note that internally the grok filter is converting your configuration to the one I recommend, so you cannot avoid the problem by not using that config.
system
(system)
Closed
May 29, 2022, 4:54pm
9
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.