Csv plugin cooperating with multiplying pattern

Hi
@Badger I need to continue below topic:

referring to above I have a question how I can mark in this code pattern, also count of records is not regular (once it's more once it's less)

a place with different kind of pattern/csv?
the second case that I need to grab a timestamp from this 1'st line of such file
# snapshot,65767220,20220601044503
As I've tried under one of method and met the issue because when all messages starting with # will be tried to have the same format (a least there will be an issue for the multiline input)

an example:


# snapshot,65767220,20220601044503
# Network Elements

0097,s,n,2719,,s,,,,3,,p,

C313,s,n,4767,,s,,y,,,,,

# DN Blocks

224135896,224135897,,,,,,,,,,,
224135896,224135897,,,,,,,,,,,


# numbers;

00000801163158,0,n,n,y
00000801163158,0,n,n,y

I would consider consuming the file with a multiline codec that collects all the lines for each '# Type', then tag them with the type and use a split filter to break each line into its own event

Suppose we have a file

# Snapshot
# ABC
a,b,c
d,e,f
# Type2
foo,1,2,3
bar,4,5,6
# DN Blocks
224135896,224135897,,,,,,,,,,,
224135896,224135897,,,,,,,,,,,

and we use this input/filter configuration

input {
    file {
        path => "/home/user/foo.txt"
        sincedb_path => "/dev/null"
        start_position => beginning
        codec => multiline { pattern => "^#" negate => true what => previous auto_flush_interval => 2 multiline_tag => "" }
    }
}
filter {
    mutate { remove_field => [ "[event]", "log" ] }
    if "# Snap" in [message] {
        mutate { add_field => { "eventType" => "Header" } }
    } else if "# ABC" in [message] {
        mutate { add_field => { "eventType" => "ABC" } }
        split { field => "message" }
        if [message] !~ /^#/ {
            csv { columns => [ "c1", "c2", "c3" ] }
        }
    } else if "# Type2" in [message] {
        mutate { add_field => { "eventType" => "Type2" } }
        split { field => "message" }
    } else {
        mutate { add_field => { "eventType" => "Unrecognized" } }
    }
}

We will get

{
            "c3" => "c",
            "c1" => "a",
       "message" => "a,b,c",
            "c2" => "b",
    "@timestamp" => 2022-07-18T17:47:47.452450Z,
     "eventType" => "ABC"
}
{
            "c3" => "f",
            "c1" => "d",
       "message" => "d,e,f",
            "c2" => "e",
    "@timestamp" => 2022-07-18T17:47:47.452450Z,
     "eventType" => "ABC"
}

as well as

{
       "message" => "# DN Blocks\n224135896,224135897,,,,,,,,,,,\n224135896,224135897,,,,,,,,,,,",
    "@timestamp" => 2022-07-18T17:47:49.939492Z,
     "eventType" => "Unrecognized"
}

Obviously you will have to add either csv or dissect for each line type.

Many thanks @Badger in the meantime I'm trying to get out the timestamp from the first line, but something goes wrong

filter {
    mutate { remove_field => [ "[event]", "log" ] }
    if "# snapshot" in [message] {
        dissect {
            mapping => {
                "[message]" => "# %{activity},%{val},%{time}"
            }
            remove_field => ["[message]"]
        }
        date {
                match => ["time", "yyyyMMddHHmmss"]
                timezone => "Europe/Paris"
            }

What goes wrong? What is the value of [time], what is the value of [@timestamp], is there a parse failure tag on the event?

Hmm, I don't know how to use this timestamps for all of event's from the first one of row
as You see it was output in the first messages but I need the same timestamp for the others

{
      "activity" => "snapshot",
          "path" => "/opt/data/input/test_npdb.txt",
      "@version" => "1",
          "time" => "20220601044503",
          "host" => "0.0.0.0",
    "@timestamp" => 2022-06-01T02:45:03Z,
           "val" => "65767220"
}
{
          "NSDN" => nil,
            "PT" => "1",
      "@version" => "1",
          "CGBL" => nil,
            "SP" => nil,
            "DN" => "C0108",
           "VMS" => nil,
          "CDBL" => "-2",
            "RN" => nil,
          "path" => "/opt/data/input/test_npdb.txt",
           "ASD" => nil,
          "IMSI" => nil,
           "GRN" => nil,
            "ST" => nil,
          "host" => "0.0.0.0",
     "eventType" => "DNs",
    "@timestamp" => 2022-07-19T18:43:33.343119Z,
       "message" => "C0108,,1,,,,,,,,,-2"
}

I've tried some code as below the got the same results as above

filter {
    mutate { remove_field => [ "[event]", "log" ] }
    if "# snapshot" in [message] {
        dissect {
            mapping => {
                "[message]" => "# %{activity},%{val},%{[@metadata][timestamp]}"
            }
            remove_field => ["[message]"]
        }
        date {
                match => ["[@metadata][timestamp]", "yyyyMMddHHmmss"]
                timezone => "Europe/Paris"
            }

         # mutate { add_field => { "eventType" => "Header" } }
    } else if "# Network Elements" in [message] {
        mutate { add_field => { "eventType" => "Network Elements" } }
        split { field => "message" }
        if [message] !~ /^#/ {
            csv { columns => ["ID","Type","PCType","PC","GC","RI","SSN","CCGT","NTT","NNAI","NNP","DA","SR"] }
        date {
                match => ["[@metadata][timestamp]", "yyyyMMddHHmmss"]
                timezone => "Europe/Paris"
            }

        }

You could save in a ruby filter and add it to non-snapshot lines. An example is here. You will need pipeline.workers 1 and pipeline.ordered true. This kind of ruby solution tends to be fragile.

great I will try to do it

{
          "host" => "0.0.0.0",
          "path" => "/opt/data/input/test2.txt",
      "activity" => "snapshot",
           "val" => "65767220",
      "@version" => "1",
          "time" => "20220601044503",
    "@timestamp" => 2022-06-01T02:45:03Z
}
{
          "host" => "0.0.0.0",
       "message" => "0097,s,n,2719,,s,,,,3,,p,",
          "CCGT" => nil,
      "metadata" => {},
           "NTT" => nil,
           "NNP" => nil,
      "@version" => "1",
            "DA" => "p",
       "SRFIMSI" => nil,
            "PC" => "2719",
          "tags" => [
        [0] "_dateparsefailure"
    ],
        "PCType" => "n",
          "path" => "/opt/data/input/test2.txt",
           "SSN" => nil,
          "NNAI" => "3",
     "eventType" => "Network Elements",
    "@timestamp" => 2022-07-19T21:10:17.456544Z,
          "Type" => "s",
            "RI" => "s",
            "GC" => nil,
            "ID" => "0097"
}


all of the pipeline code

input {
    file {
        mode => read
        path => "/opt/data/input/test2.txt"
        sincedb_path => "/dev/null"
        start_position => beginning
        file_completed_action => "log"
        file_completed_log_path => "/opt/data/logstash_files/fin_eir.log"
        codec => multiline { pattern => "^#" negate => true what => previous auto_flush_interval => 2 multiline_tag => "" }
    }
}
filter {
    mutate { remove_field => [ "[event]", "log" ] }
    if "# snapshot" in [message] {
         dissect {
            mapping => {
                "[message]" => "# %{activity},%{val},%{time}"
            }
            remove_field => ["[message]"]
        }
        date {
                match => ["time", "yyyyMMddHHmmss"]
                timezone => "Europe/Paris"
            }
        ruby {
        init => '
                @@collectingMetadata = false
            '
            code => '
                unless @@collectingMetadata
                    @@metadata = {}
                    @@collectingMetadata = true
                end
                @@metadata[event.get("time")]
            '
        }

         # mutate { add_field => { "eventType" => "Header" } }
    } else if "# Network Elements" in [message] {
        mutate { add_field => { "eventType" => "Network Elements" } }
        split { field => "message" }
        if [message] !~ /^#/ {
            csv { columns => ["ID","Type","PCType","PC","GC","RI","SSN","CCGT","NTT","NNAI","NNP","DA","SRF"] }
        }


 ruby {
                code => '
                    event.set("metadata", @@metadata)
                '
                }
 date {
                match => ["metadata", "yyyyMMddHHmmss"]
                timezone => "Europe/Paris"
            }


    } else if "# DNs" in [message] {
        mutate { add_field => { "eventType" => "DNs" } }
        split { field => "message" }
        if [message] !~ /^#/ {
            csv { columns => ["DN","IMSI","PT","SP","RN","VMS","GRN","ASD","ST","NSDN","CGBL","CDBL"] }

        }
    } else if "# DN Blocks" in [message] {
        mutate { add_field => { "eventType" => "DN Block" } }
        split { field => "message" }
        if [message] !~ /^#/ {
            csv { columns => ["BDN","EDN","PT","SP","RN","VMS","GRN","ASD","ST","NSDN","CGBL","CDBL"] }
        }
    } else {
        mutate { add_field => {"eventType" => "numbers"}}
        split { field => "message" }
        if [message] !~ /^#/ {
            csv { columns => ["IMEI","SVN","WHITE","GRAY","BLACK"] }
        }
    }
}

output {
    stdout { codec => rubydebug }
}


@Badger You can try on this sample data [test2.txt]

# snapshot,65767220,20220601044503
# Network Elements

0097,s,n,2719,,s,,,,3,,p,

C313,s,n,4767,,s,,y,,,,,

# DN Blocks

224135896,224135897,,,,,,,,,,,
224135896,224135897,,,,,,,,,,,


# numbers;

00000801163158,0,n,n,y
00000801163158,0,n,n,y

plus pipelines.yml

- pipeline.id: test
  path.config: "/usr/share/logstash/pipeline/pipeline_test.yml"
  pipeline.workers: 1
  pipeline.batch.size: 2
  pipeline.batch.delay: 50
  pipeline.ordered: true

I would use

ruby { code => '@@metadata = event.get("@timestamp")' }

for the snapshot lines, and

ruby { code => 'event.set("@timestamp", @@metadata)' }

for the others.

1 Like

thanks it works as well :wink:

When I'm processing file under ~800Mb it takes to long time through 1 worker

I think that I will try split such files on the multiply smaller files by pattern (on Python). It should be efficient than logstash performance.
BTW. It's interesting how it looks like the limit of
max_lines => ?
max_bytes => ?
when I used 16GB of mem per logstash instanace

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.