Multiline Logstash that handles timestamp on each line

I'm currently ingesting logs from multiple devices successfully, one document per row. One particular brand, Polycom, is sending a multiline entry that includes the timestamp for each row.
Sample Logging (with normal and multiline entries)

<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|dns|1|00|doDNSLookupForList(A): returning passed in ipAddress '10.0.0.1'
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sip|0|00|Trying to send data to Destination '10.0.0.1' attempting..
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sipt|0|00|>>>>>>>> REG[1] Data Sent to UDP 10.0.0.1 on socket 189\n
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sipt|0|00|    REGISTER sip.fqdn\n
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sipt|0|00|    Via: SIP/2.0/UDP\n 10.20.0.1
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sipt|0|00|    From: "RRichards"
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sipt|0|00|    To: <sip:503>
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sipt|0|00|>>>>>>>> REG End of data sent
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|dns|1|00|doDNSLookupForList for record A
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|dns|1|00|doDNSLookupForList(A): returning passed in ipAddress '10.0.0.1'
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sip|0|00|Trying to send data to Destination '10.0.0.1' attempting..
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sipt|0|00|<<<<<<<< REG[1] Data Sent to UDP 10.0.0.1 on socket 189\n
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sipt|0|00|    REGISTER sip.fqdn\n
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sipt|0|00|    Via: SIP/2.0/UDP\n 10.20.0.1
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sipt|0|00|    From: "RRichards"
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sipt|0|00|    To: <sip:503>
<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sipt|0|00|<<<<<<<< REG End of data sent

I was asked if it's possible to make the log entries for those particular lines combine into a single document. I believe this is possible but most examples refer to logging where newlines begin with spaces or do not begin with timestamps.

I have done some research and reading with examples and attempted to create a multiline codec entry but not quite sure I'm understanding the logic completely or may be missing an obvious point.

10-external-syslog-input.conf

input {
    udp {
        port => 10514
        type => "external_syslogs"
        codec => multiline {
            pattern => "^<%{NONNEGINT}>.*([<>]{8} REG End of data sent|\s{4})"
            what => "previous"
        }
    }
}

I've found references where work is done in the filtering side but those seems to be older versions. I haven't tried anything with that as of yet but will provide the filtering.

10-external-syslog-filter.conf

filter {
    if [type] == "external_syslogs" {
        grok {
          match => {
            "message" => [
              # Polycom device
              "^<%{NONNEGINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:host} %{DATA:mac}\|%{DATA:device_timestamp}\|%{DATA:ID}\|%{NONNEGINT:event_class}\|%{NONNEGINT:missed_events}\|%{DATA:event}:*%{GREEDYDATA:syslog_message}",
              "^<%{NONNEGINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} \[%{MAC:mac}\] %{GREEDYDATA:syslog_message}",
              "^<%{NONNEGINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{YEAR} %{GREEDYDATA:syslog_message}"
              ]
            }
          remove_field => "message"
          remove_field => "syslog_pri"
          remove_field => ["[event][original]"]
        }
        date {
          match => [ "[system][syslog][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
        }
    }
}

Looking for guidance and/or advice. Links to documentation or articles are greatly appreciated as well.

Should that be three events (the first 8 lines as one, then a single line, then the last 8 lines as one)?

Hey, thanks for the reply.

Sorry, more context would have been smart.
The < or > repeated 8 times are the start of the entry with the same symbols followed by "REG End of data sent" being the end demarcation.
In the sample logging I provided I would expect:

  • 2 single line entries,
  • 1 multiline entry of 6 rows
  • 3 single line entries,
  • 1 multiline entry of 6 rows

I haven't dug into it much to confirm if the # of lines varies but figured the <> symbols would be unique enough.

You cannot do that using a multiline codec because "does the line contain <<<<" does not provide enough information to tell you whether you are inside or outside of one of the multiline entries. You could do it using ruby. You must set pipeline.workers 1 and pipeline.ordered true

    ruby {
        code => '
            @accumulating ||= false
            @data ||= ""

            line = event.get("[@metadata][restOfLine]")

            if @accumulating
                @data += line + " "
                if line =~ /^[<>]{8}/
                    @accumulating = false
                    event.set("data", @data)
                else
                    event.cancel
                end
            else
                if line =~ /^[<>]{8}/
                    @accumulating = true
                    @data = line
                    event.cancel
                else
                    event.set("data", line)
                end
            end
        '
    }
    date { match => [ "[@metadata][timestamp]", "YYYY-MM-dd'T'HH:mm:ssZZ" ] }

That will get you seven events like

{
   "message" => "<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sip|0|00|Trying to send data to Destination '10.0.0.1' attempting..",
      "data" => "Trying to send data to Destination '10.0.0.1' attempting..",
....
}
{
   "message" => "<182>2022-12-30T16:09:57-06:00 10.20.0.1 64167f000000|1230160957|sipt|0|00|<<<<<<<< REG End of data sent",
      "data" => "<<<<<<<< REG[1] Data Sent to UDP 10.0.0.1 on socket 189\\n    REGISTER sip.fqdn\\n     Via: SIP/2.0/UDP\\n 10.20.0.1     From: \"RRichards\"     To: <sip:503> <<<<<<<< REG End of data sent ",
....
}
1 Like

Thanks for that lead and the code. I'll look into that, test and follow up.

I appreciate your time!