Parsing multiple lines as multiple events using fields from first and last line

Hi everyone,

I am somewhat new to Logstash. I am trying to perform a somewhat special multiline parse, and I have run out of ideas. The thing is, I want to parse some log lines, which must use values found on previous AND posterior lines. The log looks like this:

...
2020-03-15 22:43:55,680 [Component] DEBUG another.different.package - $Info: Useless info
2020-03-15 22:43:55,680 [Component] DEBUG some.package - 
Table_1: Table_1_Field_1_name, Table_1_Field_2_name, Table_1_Field_3_name, Table_1_Field_4_name
Table_1: Table_1_Field_1_value_1, Table_1_Field_2_value_1, Table_1_Field_3_value_1, Table_1_Field_4_value_1

Table_2: 			Table_2_Field_1_name, Table_2_Field_2_name, Table_2_Field_3_name, Table_2_Field_4_name
Table_2: 			Table_2_Field_1_value_1, Table_2_Field_2_value_1, Table_2_Field_3_value_1, Table_2_Field_4_value_1
Table_2: 			Table_2_Field_1_value_2, Table_2_Field_2_value_2, Table_2_Field_3_value_2, Table_2_Field_4_value_2
Table_2: 			Table_2_Field_1_value_3, Table_2_Field_2_value_3, Table_2_Field_3_value_3, Table_2_Field_4_value_3

2020-03-15 22:43:55,681 [Component] DEBUG other.package - Useless info
2020-03-15 22:43:55,688 [Component] INFO  some.package - CONSUME API returned(E) errorString(Why was the transaction not successful)
...

If the transaction was successful, the last line of the previous example would look like this:

2020-03-15 22:43:55,688 [Component] INFO  some.package - CONSUME API returned(S) errorString()

Each table transaction must be a new event, and they must use as timestamp the one found before the table transactions began. They must also report if the transaction was successful or not (and if not, why not), information which is found until all the tables transactions are over, after 1 log line.

So, for the previous example, the events created would look like this:

{
@timestamp:2020-03-15 22:43:55,680,
 table:Table_1,
Table_1_Field_1_name: Table_1_Field_1_value_1,
Table_1_Field_2_name: Table_1_Field_2_value_1,
Table_1_Field_3_name: Table_1_Field_3_value_1,
Table_1_Field_4_name: Table_1_Field_4_value_1,
success:0,
termination_message:Why was the transaction not successful
 }

{
@timestamp:2020-03-15 22:43:55,680,
 table:Table_2,
Table_2_Field_1_name: Table_2_Field_1_value_1,
Table_2_Field_2_name: Table_2_Field_2_value_1,
Table_2_Field_3_name: Table_2_Field_3_value_1,
Table_2_Field_4_name: Table_2_Field_4_value_1,
success:0,
termination_message:Why was the transaction not successful
 }

{
@timestamp:2020-03-15 22:43:55,680,
 table:Table_2,
Table_2_Field_1_name: Table_2_Field_1_value_2,
Table_2_Field_2_name: Table_2_Field_2_value_2,
Table_2_Field_3_name: Table_2_Field_3_value_2,
Table_2_Field_4_name: Table_2_Field_4_value_2,
success:0,
termination_message:Why was the transaction not successful
 }

{
@timestamp:2020-03-15 22:43:55,680,
 table:Table_2,
Table_2_Field_1_name: Table_2_Field_1_value_3,
Table_2_Field_2_name: Table_2_Field_2_value_3,
Table_2_Field_3_name: Table_2_Field_3_value_3,
Table_2_Field_4_name: Table_2_Field_4_value_3,
success:0,
termination_message:Why was the transaction not successful
 }

I have tried using the aggregate filter, creating a new aggregation when a pattern matches the first line of the example and ending it when the last one is matched afterwards. This way I can hold the timestamp and I can also know if the transaction was successful or not, but I don't know how to hold all the table transactions in the meantime, and create several events out of them when the end_of_task is set to true. Also, I do not think this is the best way of doing this, as there is no way to have a reliable task_id (I'm using the same task_id for everything).

I know also about the multiline filter, but I don't know how to use information of the ending line, or how to split the appended lines into multiple events.

I only really care about the table transactions.

Any information, ideas, or plug-in suggestions are more than welcome.

That is an interesting enough problem that I was willing to a couple of hours figuring out how to solve it. I would start with a multiline codec on the input

codec => multiline {
    pattern => 'CONSUME API returned'
    negate => true
    what => "next"
    auto_flush_interval => 1
}

Then use grok to pull out the timestamp and termination message

    grok { match => { "message" => "(?<[@metadata][ts]>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) \[Component\] DEBUG some.package" } }
    date { match => [ "[@metadata][ts]", ISO8601 ] }

    grok { match => { "message" => "(?m)CONSUME API returned(?<termination_message>.*)" } }

You will have to edit that second grok to be more specific, or do additional transformation on the resulting field. By default grok only matches against the first line of a multiline message, the (?m) in the second grok tells it to check all lines.

Next, get rid of all the lines that start with a timestamp, and split the message into an array of strings. grok understands what \n means, because it is using regexps. mutate does not, so you need a literal newline in the configuration file.

    mutate { gsub => [ "message", "\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} [^\n]+(\n|$)", "" ] }
    mutate { split => { "message" => "
" } }

The rest of the work is done in a ruby filter, which is pretty complicated, so I will break it down into parts with explanations. A ruby filter looks like this

ruby {
    init => "require 'csv'"
    remove_field => [ "message" ]
    code => '
    '
}

The remove_field option is only applied if the ruby code executes successfuly. All of the code segments below need to be concatenated in that code option. At this point the [message] field contains

   "message" => [
    [0] "Table_1: Table_1_Field_1_name, Table_1_Field_2_name, Table_1_Field_3_name, Table_1_Field_4_name",
    [1] "Table_1: Table_1_Field_1_value_1, Table_1_Field_2_value_1, Table_1_Field_3_value_1, Table_1_Field_4_value_1",
    [2] "Table_2: \t\t\tTable_2_Field_1_name, Table_2_Field_2_name, Table_2_Field_3_name, Table_2_Field_4_name",
    [3] "Table_2: \t\t\tTable_2_Field_1_value_1, Table_2_Field_2_value_1, Table_2_Field_3_value_1, Table_2_Field_4_value_1",
    [4] "Table_2: \t\t\tTable_2_Field_1_value_2, Table_2_Field_2_value_2, Table_2_Field_3_value_2, Table_2_Field_4_value_2",
    [5] "Table_2: \t\t\tTable_2_Field_1_value_3, Table_2_Field_2_value_3, Table_2_Field_3_value_3, Table_2_Field_4_value_3"
],

so first we need to separate table names and rows. That is done using this code, which extracts and removes the leading table name from each row, then saves rows of each table in a separate hash

            hashOfTableRows = {}
            event.get("message").each { |x|
                tableName = x.gsub(/^([^:]*): .*/, "\\1")
                # Create the empty array for this table
                hashOfTableRows[tableName] ||= []
                tableRow = x.gsub(/^[^:]*: [\t]*/, "")
                hashOfTableRows[tableName] << tableRow
            }

hashOfTableRows looks like this

 {
    "Table_2" => [
        [0] "Table_2_Field_1_name, Table_2_Field_2_name, Table_2_Field_3_name, Table_2_Field_4_name",
        [1] "Table_2_Field_1_value_1, Table_2_Field_2_value_1, Table_2_Field_3_value_1, Table_2_Field_4_value_1",
        [2] "Table_2_Field_1_value_2, Table_2_Field_2_value_2, Table_2_Field_3_value_2, Table_2_Field_4_value_2",
        [3] "Table_2_Field_1_value_3, Table_2_Field_2_value_3, Table_2_Field_3_value_3, Table_2_Field_4_value_3"
    ],
    "Table_1" => [
        [0] "Table_1_Field_1_name, Table_1_Field_2_name, Table_1_Field_3_name, Table_1_Field_4_name",
        [1] "Table_1_Field_1_value_1, Table_1_Field_2_value_1, Table_1_Field_3_value_1, Table_1_Field_4_value_1"
    ]
}

Next we parse each row of each table as a CSV

            hashOfTables = {}
            hashOfTableRows.each { |k, v|
                csvString = v.join("\n")
                csv = CSV.parse(csvString)
                hashOfTables[k] = csv
            }

That results in this (note the leading spaces in many fields, we will remove them later using .strip)

{
    "Table_2" => [
        [0] [
            [0] "Table_2_Field_1_name",
            [1] " Table_2_Field_2_name",
            [2] " Table_2_Field_3_name",
            [3] " Table_2_Field_4_name"
        ],
        [1] [
            [0] "Table_2_Field_1_value_1",
            [1] " Table_2_Field_2_value_1",
            [2] " Table_2_Field_3_value_1",
            [3] " Table_2_Field_4_value_1"
        ],
        [2] [
            [0] "Table_2_Field_1_value_2",
            [1] " Table_2_Field_2_value_2",
            [2] " Table_2_Field_3_value_2",
            [3] " Table_2_Field_4_value_2"
        ],
        [3] [
            [0] "Table_2_Field_1_value_3",
            [1] " Table_2_Field_2_value_3",
            [2] " Table_2_Field_3_value_3",
            [3] " Table_2_Field_4_value_3"
        ]
    ],
    "Table_1" => [
        [0] [
            [0] "Table_1_Field_1_name",
            [1] " Table_1_Field_2_name",
            [2] " Table_1_Field_3_name",
            [3] " Table_1_Field_4_name"
        ],
        [1] [
            [0] "Table_1_Field_1_value_1",
            [1] " Table_1_Field_2_value_1",
            [2] " Table_1_Field_3_value_1",
            [3] " Table_1_Field_4_value_1"
        ]
    ]
}

Foreach table we are now going to take each row except the first, and convert it into a hash, where the field names are taken from the first row

            arrayOfTableRows = []
            hashOfTables.each { |k, v|
                arrayOfColumnNames = v.shift
                v.each_index { |i|
                    tableRow = {}
                    tableRow["table"] = k
                        v[i].each_index { |j|
                            columnName = arrayOfColumnNames[j].strip
                            columnValue = v[i][j].strip
                            tableRow[columnName] = columnValue
                        }
                    arrayOfTableRows << tableRow
                }
            }
            event.set("tables", arrayOfTableRows)

At that point [tables] looks like this:

             "tables" => [
    [0] {
        "Table_1_Field_4_name" => "Table_1_Field_4_value_1",
                       "table" => "Table_1",
        "Table_1_Field_1_name" => "Table_1_Field_1_value_1",
        "Table_1_Field_3_name" => "Table_1_Field_3_value_1",
        "Table_1_Field_2_name" => "Table_1_Field_2_value_1"
    },
    [1] {
                       "table" => "Table_2",
        "Table_2_Field_1_name" => "Table_2_Field_1_value_1",
        "Table_2_Field_4_name" => "Table_2_Field_4_value_1",
        "Table_2_Field_2_name" => "Table_2_Field_2_value_1",
        "Table_2_Field_3_name" => "Table_2_Field_3_value_1"
    },
    [2] {
                       "table" => "Table_2",
        "Table_2_Field_1_name" => "Table_2_Field_1_value_2",
        "Table_2_Field_4_name" => "Table_2_Field_4_value_2",
        "Table_2_Field_2_name" => "Table_2_Field_2_value_2",
        "Table_2_Field_3_name" => "Table_2_Field_3_value_2"
    },
    [3] {
                       "table" => "Table_2",
        "Table_2_Field_1_name" => "Table_2_Field_1_value_3",
        "Table_2_Field_4_name" => "Table_2_Field_4_value_3",
        "Table_2_Field_2_name" => "Table_2_Field_2_value_3",
        "Table_2_Field_3_name" => "Table_2_Field_3_value_3"
    }
],

That is the last piece of the ruby filter. Next, add

 split { field => "tables" }

and you will end up with 4 events, the last of which looks like

{
               "path" => "/home/user/foo.txt",
"termination_message" => "(E) errorString(Why was the transaction not successful)",
               "host" => "somewhere",
         "@timestamp" => 2020-03-16T02:43:55.680Z,
               "tags" => [
    [0] "multiline"
],
             "tables" => {
                   "table" => "Table_2",
    "Table_2_Field_1_name" => "Table_2_Field_1_value_3",
    "Table_2_Field_4_name" => "Table_2_Field_4_value_3",
    "Table_2_Field_2_name" => "Table_2_Field_2_value_3",
    "Table_2_Field_3_name" => "Table_2_Field_3_value_3"
},
           "@version" => "1"
}

If you want to move the contents of the [tables] field to the top level you can add a second ruby filter after the split filter, which would look much like this.

1 Like

Hey Badger,

Thank you very much! Your solution works like a charm. You're great!

I just have one question: You mentioned grok only matches against the first line of a multiline message, so why is the first grok working? There are some other lines before the line that marks the start of the table transactions (the one we're using to extract the transaction timestamp). I mean, to be more clear, there's this line I used in the example:

[Component] DEBUG another.different.package - $Info: Useless info

That line comes before the one we're using to extract the transaction timestamp (and there are more lines in the log file I'm parsing), so why is that first grok working? To my understanding, the timestamp I want to extract is not the first line of the event.

Thanks again for your help!

In the sample data I used the first line started with a timestamp. If the actual data does not you might need to add (?m) to the first grok too.

Got it. Thanks!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.