Grok for data

Can anyone try to build grok for below data
it's important that timestamp should be took from the first line of document 20220704061503
and interesting columns number: 0000080 data1:abort 0 type: onlist yes
input of data:

# snapshot,66472243,20220704061503
list_of_count(number 0000080, abort 0, onlist yes)
list_of_count(number 0000100, abort 0, onlist yes)
list_of_count(number 0000605, abort 0, onlist yes)
list_of_count(number 0000605, abort 0, onlist yes)
list_of_count(number 0000750, abort 0, onlist yes)
list_of_count(number 0000905, abort 0, onlist yes)
list_of_count(number 0006063, abort 0, onlist yes)

This make should parse your data.

input {

  generator {
        lines => [
          "# snapshot,66472243,20220704061503",
          "list_of_count(number 0000080, abort 0, onlist yes)",
          "list_of_count(number 0000100, abort 0, onlist yes)",
          "list_of_count(number 0000605, abort 0, onlist yes)",
          "list_of_count(number 0000605, abort 0, onlist yes)",
          "list_of_count(number 0000750, abort 0, onlist yes)",
          "list_of_count(number 0000905, abort 0, onlist yes)",
          "list_of_count(number 0006063, abort 0, onlist yes)"
        ]
        count => 1
  }

} # input

filter {

    grok {
	  match => { break_on_match => "true"
	  "message" => [ "%{DATA:count}\(%{DATA:type} %{INT:numvalue}, %{DATA:status} %{INT:statusval:int}, %{DATA:list} %{DATA:listval}\)", 
	  "# %{DATA:activity},%{DATA:val},%{GREEDYDATA:time}" ]
	  }
	}

} #filter

output {
  
    stdout { codec => rubydebug{} }
	
} # output

Thanks, I will try this way

Do You know how can I transform date for the standard timestamp format I've tried with
%{TIMESTAMP_ISO8601:time} but it doesn't get expected results


filter {

    grok {
          match => { break_on_match => "true"
          "message" => [ "%{DATA:count}\(%{DATA:type} %{INT:numvalue}, %{DATA:status} %{INT:statusval:int}, %{DATA:list} %{DATA:listval}\)",
          "# %{DATA:activity},%{DATA:val},%{GREEDYDATA:time}" ]
          }
        }

        mutate{
                convert => { "time" => "integer" }
                add_field => { "starttime1" =>  "%{time}00" }
                convert => { "starttime1" => "integer" }
        }
        date{
                match => ["starttime1","yyyyMMddHHmmss"]
                timezone => "Europe/Paris"
                target => "@timestamp"
        }




} #filter

but it replays with dataparsefailure


{
         "count" => "list_of_count",
          "type" => "number",
        "status" => "abort",
       "listval" => "yes",
    "starttime1" => "%{time}00",
          "list" => "onlist",
      "sequence" => 0,
      "@version" => "1",
          "tags" => [
        [0] "_dateparsefailure"
    ],
    "@timestamp" => 2022-07-11T16:24:28.138691Z,
      "numvalue" => "0006063",
     "statusval" => 0,
          "host" => "0.0.0.0",
       "message" => "list_of_count(number 0006063, abort 0, onlist yes)"

ok it was fixed by


        mutate{
                convert => { "time" => "string" }
        }
        date{
                match => ["time","YYYYMMddHHmmss"]
                timezone => "Europe/Paris"
                target => "@timestamp"
        }

but still got the timestamp in the second data row with sys timestamp ? why?

{
      "sequence" => 0,
      "@version" => "1",
          "time" => "20220704061503",
           "val" => "66472243",
    "@timestamp" => 2022-07-04T04:15:03Z,
      "activity" => "snapshot",
          "host" => "0.0.0.0",
       "message" => "# snapshot,66472243,20220704061503"
}
{
         "count" => "list_of_count",
          "type" => "number",
        "status" => "abort",
       "listval" => "yes",
          "list" => "onlist",
      "sequence" => 0,
      "@version" => "1",
    **"@timestamp" => 2022-07-11T16:35:27.977463Z,**
      "numvalue" => "0000080",
     "statusval" => 0,
          "host" => "0.0.0.0",
       "message" => "list_of_count(number 0000080, abort 0, onlist yes)"
}

why

  date{         
   match => ["time","YYYYMMddHHmmss"]
            timezone => "Europe/Paris"
            target => "@timestamp"
    }

doesn't work as a global variable for each document in that single request after target?

1st line contains date, other lines don't have.
@timestamp is always added to the message.
When you use date plugin, it's overwritten otherwise use LS time value to set @timestamp.

        date{
                match => ["time","YYYYMMddHHmmss"]
                timezone => "Europe/Paris"
                target => "@timestamp"
        }

so how can I manipulate this timestamp, when I need to add this timestamp to the others lines?

It will be added automatically by LS. If you want to change value, use the date plugin.

==========================
So below You can see content of pipeline and on the bottom output, if I'm overwrite timestamp over data plugin it was changed only for the first line.

vi pipeline_eir.yml
        lines => [
          "# snapshot,66472243,20220704061503",
          "list_of_count(number 0000080, abort 0, onlist yes)",
          "list_of_count(number 0000100, abort 0, onlist yes)",
          "list_of_count(number 0000605, abort 0, onlist yes)",
          "list_of_count(number 0000605, abort 0, onlist yes)",
          "list_of_count(number 0000750, abort 0, onlist yes)",
          "list_of_count(number 0000905, abort 0, onlist yes)",
          "list_of_count(number 0006063, abort 0, onlist yes)"
        ]
        count => 1
  }

} # input

filter {

    grok {
          match => { break_on_match => "true"
          "message" => [ "%{DATA:count}\(%{DATA:type} %{INT:numvalue}, %{DATA:status} %{INT:statusval:int}, %{DATA:list} %{DATA:listval}\)",
          "# %{DATA:activity},%{DATA:val},%{GREEDYDATA:time}" ]
          }
        }



  date{
   match => ["time","YYYYMMddHHmmss"]
            timezone => "Europe/Paris"
            target => "@timestamp"
    }



} #filter

output {

    stdout { codec => rubydebug{} }

} # output

output

[INFO ] 2022-07-11 18:08:27.891 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:eir], :non_running_pipelines=>[]}
{
      "sequence" => 0,
      "@version" => "1",
          "time" => "20220704061503",
           "val" => "66472243",
    "@timestamp" => 2022-07-04T04:15:03Z,
      "activity" => "snapshot",
          "host" => "0.0.0.0",
       "message" => "# snapshot,66472243,20220704061503"
}
{
         "count" => "list_of_count",
          "type" => "number",
        "status" => "abort",
       "listval" => "yes",
          "list" => "onlist",
      "sequence" => 0,
      "@version" => "1",
    "@timestamp" => 2022-07-11T18:08:27.892362Z,
      "numvalue" => "0000080",
     "statusval" => 0,
          "host" => "0.0.0.0",
       "message" => "list_of_count(number 0000080, abort 0, onlist yes)"
}
{
         "count" => "list_of_count",
          "type" => "number",
        "status" => "abort",
       "listval" => "yes",
          "list" => "onlist",
      "sequence" => 0,
      "@version" => "1",
    "@timestamp" => 2022-07-11T18:08:27.892628Z,
      "numvalue" => "0000100",
     "statusval" => 0,
          "host" => "0.0.0.0",
       "message" => "list_of_count(number 0000100, abort 0, onlist yes)"
}

I don't know what's wrong...

For Logstash every event is independent and you only have the date information in your first event, all the following events will have the auto-generated value for the @timestamp field, they won't have the same value as the first event.

To have the same date in all your events you need to first work with this log as it is a multiline log, this will result in an event with the header and all the other lines, you can then use some filters to parse the first line and get the date, remove it, and split the rest of the message in multiple events, which will have the correct date.

Assuming that your logs have this format and different events always have a header starting with #, you have something like this:

# snapshot,66472243,20220704061503
list_of_count(number 0000080, abort 0, onlist yes)
list_of_count(number 0000100, abort 0, onlist yes)
list_of_count(number 0000605, abort 0, onlist yes)
list_of_count(number 0000605, abort 0, onlist yes)
list_of_count(number 0000750, abort 0, onlist yes)
list_of_count(number 0000905, abort 0, onlist yes)
list_of_count(number 0006063, abort 0, onlist yes)

To parse it and have the information from the header added to every event, the following pipeline will do the job.

#
input {
    stdin {
        codec => multiline {
            pattern => '#'
            auto_flush_interval => 5
            negate => true
            what => "previous"
        }
    }
}

filter {
    mutate {
        gsub => ["message", "\n",";"]
    }
    mutate {
        split => { 
            "message" => ";"
        }
    }
    dissect {
        mapping => {
            "[message][0]" => "# %{activity},%{val},%{time}"
        }
        remove_field => ["[message][0]"]
    }
    split {
        field => "message"
    }
    date {
        match => ["time", "yyyyMMddHHmmss"]
        timezone => "Europe/Paris"
    }
    dissect {
        mapping => {
            "message" => "%{}(%{type} %{numvalue}, %{status} %{statusval}, %{list} %{listval})"
        }
    }
}

The multiline codec will give you this message:

# snapshot,66472243,20220704061503\nlist_of_count(number 0000080, abort 0, onlist yes)\nlist_of_count(number 0000100, abort 0, onlist yes)\nlist_of_count(number 0000605, abort 0, onlist yes)\nlist_of_count(number 0000605, abort 0, onlist yes)\nlist_of_count
(number 0000750, abort 0, onlist yes)\nlist_of_count(number 0000905, abort 0, onlist yes)\nlist_of_count(number 0006063, abort 0, onlist yes)

It's the header and the other events in the same line with a literal \n between them, the filters in the filter block will split this in multiple events.

The first mutate will change the literal \n added by the multiline codec in the input to a ;, this is neede because the option split of the mutate filter does not work with \n for some reason.

The second mutate will split your event into an array where the first element is your header.

The first dissect will parse the first element of the array, [message][0], to get the fields activity, val and time, if this filter works, it will also remove this element.

The split filter will now create a new event for each one of the items in the message field.

The date filter will parse your date and the second dissect will extract the rest of the fields.

1 Like

Many thanks @leandrojmp then I will try this way and get back with results/

@leandrojmp I have just the last question regarding this topic
if I have the input data as:

# snapshot,66472243,20220704061503
list_of_count(number 0000080, abort 0, onlist yes)
list_of_count(number 0000100, abort 0, onlist yes)
list_of_count(number 0000605, abort 0, onlist yes)
list_of_count(number 0000605, abort 0, onlist yes)
list_of_count(number 0000750, abort 0, onlist yes)
list_of_count(number 0000905, abort 0, onlist yes)
list_of_count(number 0006063, abort 0, onlist yes)
# 20220704061503

how I can ignore below warning :

Dissector - Dissector mapping, pattern not found {"field"=>"[message][0]", "pattern"=>"# %{activity},%{num_of_snapshot},%{time}", "event"=>{"@t9910Z, "host"=>"0.0.0.0", "path"=>"/opt/data/input/list_of-1301-a_20220704061503", "tags"=>["_dissectfailure"], "@version"=>"1", "message"=>["# 20220704061503"]}}

This happens because dissect expects that all messages starting with # have the same format, which is not the case, this will be also an issue for the multiline input.

As I said, the pipeline I shared assumes that your events have this format:

# snapshot,66472243,20220704061503
list_of_count(number 0000080, abort 0, onlist yes)
list_of_count(number 0000100, abort 0, onlist yes)
list_of_count(number 0000605, abort 0, onlist yes)
list_of_count(number 0000605, abort 0, onlist yes)
list_of_count(number 0000750, abort 0, onlist yes)
list_of_count(number 0000905, abort 0, onlist yes)
list_of_count(number 0006063, abort 0, onlist yes)
# anotherevent,66472243,20220704061503
list_of_count(number 0000080, abort 0, onlist yes)
list_of_count(number 0000100, abort 0, onlist yes)
list_of_count(number 0000605, abort 0, onlist yes)
list_of_count(number 0000605, abort 0, onlist yes)
list_of_count(number 0000750, abort 0, onlist yes)
list_of_count(number 0000905, abort 0, onlist yes)
list_of_count(number 0006063, abort 0, onlist yes)

If they do not have this format, but the one you shared now ending with another # line, then the pipeline won't work as expected and will need some changes in the multiline part.

Since this topic is already marked as solved, If you have issues changing the multiline part to work in your events, I suggest that you open a new topic and share the FULL event and more then one sample message as they appear in your files.

This is important @INS , LS and FB have to know where is the start and the end of messages. Show few lines, others will help to parse either is the single or the multiline message. Mask or replace restricted data with similar, plugins don't care about that.

@Rios
one file contains such example data, I need to parse file by file with such date, in the meantime
it turns out that the grok method cannot be done due to a badly inserted date. as explained above. That's why leandro proposed multicode, which turns out to be a good technique, but at the very end in the file I also have one that I have to completely avoid (# 20220704061503) it is unnecessary pls check Issue for the multiline input

# snapshot,66472243,20220704061503
list_of_count(number 0000080, abort 0, onlist yes)
list_of_count(number 0000100, abort 0, onlist yes)
list_of_count(number 0000605, abort 0, onlist yes)
list_of_count(number 0000605, abort 0, onlist yes)
list_of_count(number 0000750, abort 0, onlist yes)
list_of_count(number 0000905, abort 0, onlist yes)
list_of_count(number 0006063, abort 0, onlist yes)
# 20220704061503