Grok for data

For Logstash every event is independent and you only have the date information in your first event, all the following events will have the auto-generated value for the @timestamp field, they won't have the same value as the first event.

To have the same date in all your events you need to first work with this log as it is a multiline log, this will result in an event with the header and all the other lines, you can then use some filters to parse the first line and get the date, remove it, and split the rest of the message in multiple events, which will have the correct date.

Assuming that your logs have this format and different events always have a header starting with #, you have something like this:

# snapshot,66472243,20220704061503
list_of_count(number 0000080, abort 0, onlist yes)
list_of_count(number 0000100, abort 0, onlist yes)
list_of_count(number 0000605, abort 0, onlist yes)
list_of_count(number 0000605, abort 0, onlist yes)
list_of_count(number 0000750, abort 0, onlist yes)
list_of_count(number 0000905, abort 0, onlist yes)
list_of_count(number 0006063, abort 0, onlist yes)

To parse it and have the information from the header added to every event, the following pipeline will do the job.

#
input {
    stdin {
        codec => multiline {
            pattern => '#'
            auto_flush_interval => 5
            negate => true
            what => "previous"
        }
    }
}

filter {
    mutate {
        gsub => ["message", "\n",";"]
    }
    mutate {
        split => { 
            "message" => ";"
        }
    }
    dissect {
        mapping => {
            "[message][0]" => "# %{activity},%{val},%{time}"
        }
        remove_field => ["[message][0]"]
    }
    split {
        field => "message"
    }
    date {
        match => ["time", "yyyyMMddHHmmss"]
        timezone => "Europe/Paris"
    }
    dissect {
        mapping => {
            "message" => "%{}(%{type} %{numvalue}, %{status} %{statusval}, %{list} %{listval})"
        }
    }
}

The multiline codec will give you this message:

# snapshot,66472243,20220704061503\nlist_of_count(number 0000080, abort 0, onlist yes)\nlist_of_count(number 0000100, abort 0, onlist yes)\nlist_of_count(number 0000605, abort 0, onlist yes)\nlist_of_count(number 0000605, abort 0, onlist yes)\nlist_of_count
(number 0000750, abort 0, onlist yes)\nlist_of_count(number 0000905, abort 0, onlist yes)\nlist_of_count(number 0006063, abort 0, onlist yes)

It's the header and the other events in the same line with a literal \n between them, the filters in the filter block will split this in multiple events.

The first mutate will change the literal \n added by the multiline codec in the input to a ;, this is neede because the option split of the mutate filter does not work with \n for some reason.

The second mutate will split your event into an array where the first element is your header.

The first dissect will parse the first element of the array, [message][0], to get the fields activity, val and time, if this filter works, it will also remove this element.

The split filter will now create a new event for each one of the items in the message field.

The date filter will parse your date and the second dissect will extract the rest of the fields.

1 Like