Parsing multiple lines as multiple events using fields from first and last line

That is an interesting enough problem that I was willing to a couple of hours figuring out how to solve it. I would start with a multiline codec on the input

codec => multiline {
    pattern => 'CONSUME API returned'
    negate => true
    what => "next"
    auto_flush_interval => 1
}

Then use grok to pull out the timestamp and termination message

    grok { match => { "message" => "(?<[@metadata][ts]>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) \[Component\] DEBUG some.package" } }
    date { match => [ "[@metadata][ts]", ISO8601 ] }

    grok { match => { "message" => "(?m)CONSUME API returned(?<termination_message>.*)" } }

You will have to edit that second grok to be more specific, or do additional transformation on the resulting field. By default grok only matches against the first line of a multiline message, the (?m) in the second grok tells it to check all lines.

Next, get rid of all the lines that start with a timestamp, and split the message into an array of strings. grok understands what \n means, because it is using regexps. mutate does not, so you need a literal newline in the configuration file.

    mutate { gsub => [ "message", "\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} [^\n]+(\n|$)", "" ] }
    mutate { split => { "message" => "
" } }

The rest of the work is done in a ruby filter, which is pretty complicated, so I will break it down into parts with explanations. A ruby filter looks like this

ruby {
    init => "require 'csv'"
    remove_field => [ "message" ]
    code => '
    '
}

The remove_field option is only applied if the ruby code executes successfuly. All of the code segments below need to be concatenated in that code option. At this point the [message] field contains

   "message" => [
    [0] "Table_1: Table_1_Field_1_name, Table_1_Field_2_name, Table_1_Field_3_name, Table_1_Field_4_name",
    [1] "Table_1: Table_1_Field_1_value_1, Table_1_Field_2_value_1, Table_1_Field_3_value_1, Table_1_Field_4_value_1",
    [2] "Table_2: \t\t\tTable_2_Field_1_name, Table_2_Field_2_name, Table_2_Field_3_name, Table_2_Field_4_name",
    [3] "Table_2: \t\t\tTable_2_Field_1_value_1, Table_2_Field_2_value_1, Table_2_Field_3_value_1, Table_2_Field_4_value_1",
    [4] "Table_2: \t\t\tTable_2_Field_1_value_2, Table_2_Field_2_value_2, Table_2_Field_3_value_2, Table_2_Field_4_value_2",
    [5] "Table_2: \t\t\tTable_2_Field_1_value_3, Table_2_Field_2_value_3, Table_2_Field_3_value_3, Table_2_Field_4_value_3"
],

so first we need to separate table names and rows. That is done using this code, which extracts and removes the leading table name from each row, then saves rows of each table in a separate hash

            hashOfTableRows = {}
            event.get("message").each { |x|
                tableName = x.gsub(/^([^:]*): .*/, "\\1")
                # Create the empty array for this table
                hashOfTableRows[tableName] ||= []
                tableRow = x.gsub(/^[^:]*: [\t]*/, "")
                hashOfTableRows[tableName] << tableRow
            }

hashOfTableRows looks like this

 {
    "Table_2" => [
        [0] "Table_2_Field_1_name, Table_2_Field_2_name, Table_2_Field_3_name, Table_2_Field_4_name",
        [1] "Table_2_Field_1_value_1, Table_2_Field_2_value_1, Table_2_Field_3_value_1, Table_2_Field_4_value_1",
        [2] "Table_2_Field_1_value_2, Table_2_Field_2_value_2, Table_2_Field_3_value_2, Table_2_Field_4_value_2",
        [3] "Table_2_Field_1_value_3, Table_2_Field_2_value_3, Table_2_Field_3_value_3, Table_2_Field_4_value_3"
    ],
    "Table_1" => [
        [0] "Table_1_Field_1_name, Table_1_Field_2_name, Table_1_Field_3_name, Table_1_Field_4_name",
        [1] "Table_1_Field_1_value_1, Table_1_Field_2_value_1, Table_1_Field_3_value_1, Table_1_Field_4_value_1"
    ]
}

Next we parse each row of each table as a CSV

            hashOfTables = {}
            hashOfTableRows.each { |k, v|
                csvString = v.join("\n")
                csv = CSV.parse(csvString)
                hashOfTables[k] = csv
            }

That results in this (note the leading spaces in many fields, we will remove them later using .strip)

{
    "Table_2" => [
        [0] [
            [0] "Table_2_Field_1_name",
            [1] " Table_2_Field_2_name",
            [2] " Table_2_Field_3_name",
            [3] " Table_2_Field_4_name"
        ],
        [1] [
            [0] "Table_2_Field_1_value_1",
            [1] " Table_2_Field_2_value_1",
            [2] " Table_2_Field_3_value_1",
            [3] " Table_2_Field_4_value_1"
        ],
        [2] [
            [0] "Table_2_Field_1_value_2",
            [1] " Table_2_Field_2_value_2",
            [2] " Table_2_Field_3_value_2",
            [3] " Table_2_Field_4_value_2"
        ],
        [3] [
            [0] "Table_2_Field_1_value_3",
            [1] " Table_2_Field_2_value_3",
            [2] " Table_2_Field_3_value_3",
            [3] " Table_2_Field_4_value_3"
        ]
    ],
    "Table_1" => [
        [0] [
            [0] "Table_1_Field_1_name",
            [1] " Table_1_Field_2_name",
            [2] " Table_1_Field_3_name",
            [3] " Table_1_Field_4_name"
        ],
        [1] [
            [0] "Table_1_Field_1_value_1",
            [1] " Table_1_Field_2_value_1",
            [2] " Table_1_Field_3_value_1",
            [3] " Table_1_Field_4_value_1"
        ]
    ]
}

Foreach table we are now going to take each row except the first, and convert it into a hash, where the field names are taken from the first row

            arrayOfTableRows = []
            hashOfTables.each { |k, v|
                arrayOfColumnNames = v.shift
                v.each_index { |i|
                    tableRow = {}
                    tableRow["table"] = k
                        v[i].each_index { |j|
                            columnName = arrayOfColumnNames[j].strip
                            columnValue = v[i][j].strip
                            tableRow[columnName] = columnValue
                        }
                    arrayOfTableRows << tableRow
                }
            }
            event.set("tables", arrayOfTableRows)

At that point [tables] looks like this:

             "tables" => [
    [0] {
        "Table_1_Field_4_name" => "Table_1_Field_4_value_1",
                       "table" => "Table_1",
        "Table_1_Field_1_name" => "Table_1_Field_1_value_1",
        "Table_1_Field_3_name" => "Table_1_Field_3_value_1",
        "Table_1_Field_2_name" => "Table_1_Field_2_value_1"
    },
    [1] {
                       "table" => "Table_2",
        "Table_2_Field_1_name" => "Table_2_Field_1_value_1",
        "Table_2_Field_4_name" => "Table_2_Field_4_value_1",
        "Table_2_Field_2_name" => "Table_2_Field_2_value_1",
        "Table_2_Field_3_name" => "Table_2_Field_3_value_1"
    },
    [2] {
                       "table" => "Table_2",
        "Table_2_Field_1_name" => "Table_2_Field_1_value_2",
        "Table_2_Field_4_name" => "Table_2_Field_4_value_2",
        "Table_2_Field_2_name" => "Table_2_Field_2_value_2",
        "Table_2_Field_3_name" => "Table_2_Field_3_value_2"
    },
    [3] {
                       "table" => "Table_2",
        "Table_2_Field_1_name" => "Table_2_Field_1_value_3",
        "Table_2_Field_4_name" => "Table_2_Field_4_value_3",
        "Table_2_Field_2_name" => "Table_2_Field_2_value_3",
        "Table_2_Field_3_name" => "Table_2_Field_3_value_3"
    }
],

That is the last piece of the ruby filter. Next, add

 split { field => "tables" }

and you will end up with 4 events, the last of which looks like

{
               "path" => "/home/user/foo.txt",
"termination_message" => "(E) errorString(Why was the transaction not successful)",
               "host" => "somewhere",
         "@timestamp" => 2020-03-16T02:43:55.680Z,
               "tags" => [
    [0] "multiline"
],
             "tables" => {
                   "table" => "Table_2",
    "Table_2_Field_1_name" => "Table_2_Field_1_value_3",
    "Table_2_Field_4_name" => "Table_2_Field_4_value_3",
    "Table_2_Field_2_name" => "Table_2_Field_2_value_3",
    "Table_2_Field_3_name" => "Table_2_Field_3_value_3"
},
           "@version" => "1"
}

If you want to move the contents of the [tables] field to the top level you can add a second ruby filter after the split filter, which would look much like this.

1 Like