Multiline logs into one event using logstash?

hi all! I have these logs:

Nov 23 18:57:14 mx.host.cloud 18:57:14.756 2 SIPS-072111 SIPDATA-124634 REGISTER sip:111.222.333.444:65110 from udp[555.666.777.888]:65111
Nov 23 18:57:14 mx.host.cloud 18:57:14.756 2 SIPS-072111 SIGNAL-154480 created
Nov 23 18:57:16 mx.host.cloud 18:57:16.000 2 SIPS-072111 SIPDATA-124636 404-REGISTER(final) sent [0.0.0.0]:65100 -> udp[555.666.777.888]:65111

as you can see, for one event (action), several log lines are generated on the server, but each line has an event ID - 072111

how can i bind such multi-line logs into one event using logstash?

I didn't see the multiline codec in the elastic documentation, but after studying it in more detail, I realized that this is not exactly what I need. what options do i have?

You could try using an aggregate filter.

yes, I stumbled upon it after a lot of searching for a solution, but I'm not able to apply it correctly for my situation. can you help?

my filter looks like this:

if [ident] {
        aggregate {
            task_id => "%{ident}"
                code => "
                    map['main_hostname'] = event.get('main_hostname')
                    map['client_hostname'] = event.get('client_hostname')
                    "
        }
    }

my original log:

Nov 23 18:57:14 mx.host.cloud 18:57:14.756 2 SIPS-072111 SIPDATA-124634 REGISTER sip:111.222.333.444:65110 from udp[555.666.777.888]:65111
Nov 23 18:57:14 mx.host.cloud 18:57:14.756 2 SIPS-072111 SIGNAL-154480 created
Nov 23 18:57:16 mx.host.cloud 18:57:16.000 2 SIPS-072111 SIPDATA-124636 404-REGISTER(final) sent [0.0.0.0]:65100 -> udp[555.666.777.888]:65111

and the main grok:
%{SYSLOGTIMESTAMP:date} %{IPORHOST:hostname} %{TIME:timestamp} %{INT} %{DATA:protocol}-%{INT:ident} %{GREEDYDATA} %{IPORHOST:main_hostname}:%{INT} %{GREEDYDATA:msg} \[%{IPORHOST:client_hostname}\]:%{INT}

as a result, the events are still not merged...

Firstly, I understand the need to redact your actual IP address, but please do not replace them with something that is not a valid address. 55.66.77.88 would be much better than 555.666.777.888.

Second, your grok pattern does not match any of your log lines because neither the main_hostname pattern nor the client_hostname are preceded by a space in the logs.

Third, adding something to the map does not modify the events.

One option would be to collect parts of all the event log messages in the event, event.cancel the individual events and create a new event using the push_map_as_event_on_timeout option.

Another possibility is to copy fields from one event to subsequent events for the same task_id. Something like

code => '
    if !map["main_hostname"]
        map["main_hostname"] = event.get("main_hostname")
    else
        event.set("main_hostname", map["main_hostname"])
    end
    ...
'

I have previously posted many examples of using push_map_as_event_on_timeout.

1 Like

thanks a lot for your answer :slight_smile:

I corrected the grok and other data to remove confidential information and could make a mistake somewhere, this is true :^(

I tried to follow your advice, but did not get a positive result :frowning:

...
   if [id] {
        aggregate {
            task_id => "%{id}"
            code => '
                if !map["client_hostname"]
                map["client_hostname"] = event.get("client_hostname")
                else
                event.set("client_hostname", map["client_hostname"])
                end
            '
        }
    }

   if [id] {
        aggregate {
            task_id => "%{id}"
            code => '
                if !map["main_hostname"]
                map["main_hostname"] = event.get("main_hostname")
                else
                event.set("main_hostname", map["main_hostname"])
                end
            '
        }
    }
...

Am I using the tool incorrectly? now I will try your advice with push_map_as_event_on_timeout

I didn't get a positive result with push_map_as_event_on_timeout either :frowning:

hi, can u help me plz?

I would split the grok into two so that the first one matches the common prefix and the second parses the hostnames that are present in the first log line. With a single grok two of the lines will not match so they will not have an [ident] field.

    grok { match => { "message" => "%{SYSLOGTIMESTAMP:date} %{IPORHOST:hostname} %{TIME:timestamp} %{INT} %{DATA:protocol}-%{INT:ident} %{GREEDYDATA:restOfLine}" } }
    grok { match => { "restOfLine" => "%{GREEDYDATA}%{IPORHOST:main_hostname}:%{INT} %{GREEDYDATA:msg}\[%{IPORHOST:client_hostname}\]:%{INT}" } }
    aggregate {
        task_id => "%{ident}"
        code => '
            if !map["client_hostname"]
                map["client_hostname"] = event.get("client_hostname")
            else
                event.set("client_hostname", map["client_hostname"])
            end
            if !map["main_hostname"]
                map["main_hostname"] = event.get("main_hostname")
            else
                event.set("main_hostname", map["main_hostname"])
            end
        '
    }

its work! thank u very much

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.