Nov 23 18:57:14 mx.host.cloud 18:57:14.756 2 SIPS-072111 SIPDATA-124634 REGISTER sip:111.222.333.444:65110 from udp[555.666.777.888]:65111
Nov 23 18:57:14 mx.host.cloud 18:57:14.756 2 SIPS-072111 SIGNAL-154480 created
Nov 23 18:57:16 mx.host.cloud 18:57:16.000 2 SIPS-072111 SIPDATA-124636 404-REGISTER(final) sent [0.0.0.0]:65100 -> udp[555.666.777.888]:65111
as you can see, for one event (action), several log lines are generated on the server, but each line has an event ID - 072111
how can i bind such multi-line logs into one event using logstash?
I didn't see the multiline codec in the elastic documentation, but after studying it in more detail, I realized that this is not exactly what I need. what options do i have?
Nov 23 18:57:14 mx.host.cloud 18:57:14.756 2 SIPS-072111 SIPDATA-124634 REGISTER sip:111.222.333.444:65110 from udp[555.666.777.888]:65111
Nov 23 18:57:14 mx.host.cloud 18:57:14.756 2 SIPS-072111 SIGNAL-154480 created
Nov 23 18:57:16 mx.host.cloud 18:57:16.000 2 SIPS-072111 SIPDATA-124636 404-REGISTER(final) sent [0.0.0.0]:65100 -> udp[555.666.777.888]:65111
and the main grok: %{SYSLOGTIMESTAMP:date} %{IPORHOST:hostname} %{TIME:timestamp} %{INT} %{DATA:protocol}-%{INT:ident} %{GREEDYDATA} %{IPORHOST:main_hostname}:%{INT} %{GREEDYDATA:msg} \[%{IPORHOST:client_hostname}\]:%{INT}
Firstly, I understand the need to redact your actual IP address, but please do not replace them with something that is not a valid address. 55.66.77.88 would be much better than 555.666.777.888.
Second, your grok pattern does not match any of your log lines because neither the main_hostname pattern nor the client_hostname are preceded by a space in the logs.
Third, adding something to the map does not modify the events.
One option would be to collect parts of all the event log messages in the event, event.cancel the individual events and create a new event using the push_map_as_event_on_timeout option.
Another possibility is to copy fields from one event to subsequent events for the same task_id. Something like
code => '
if !map["main_hostname"]
map["main_hostname"] = event.get("main_hostname")
else
event.set("main_hostname", map["main_hostname"])
end
...
'
I have previously posted many examples of using push_map_as_event_on_timeout.
I would split the grok into two so that the first one matches the common prefix and the second parses the hostnames that are present in the first log line. With a single grok two of the lines will not match so they will not have an [ident] field.
grok { match => { "message" => "%{SYSLOGTIMESTAMP:date} %{IPORHOST:hostname} %{TIME:timestamp} %{INT} %{DATA:protocol}-%{INT:ident} %{GREEDYDATA:restOfLine}" } }
grok { match => { "restOfLine" => "%{GREEDYDATA}%{IPORHOST:main_hostname}:%{INT} %{GREEDYDATA:msg}\[%{IPORHOST:client_hostname}\]:%{INT}" } }
aggregate {
task_id => "%{ident}"
code => '
if !map["client_hostname"]
map["client_hostname"] = event.get("client_hostname")
else
event.set("client_hostname", map["client_hostname"])
end
if !map["main_hostname"]
map["main_hostname"] = event.get("main_hostname")
else
event.set("main_hostname", map["main_hostname"])
end
'
}
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.