Shift value in CSV condition

Hi

How I can shift some fields with value in "if" condition
for example: CLLI, SWREL for the output in one event?

expected output:

{
"NDCFLXDA" => "0",
"@version" => "1",
"NDCFLXDC" => "0",
"STATUS" => "K",
"NMDCLFLR" => "0",
"@timestamp" => 2023-02-24T13:24:14Z,
"SUSRECVD" => "0",
"NEARMGIH" => "0",
"LINK" => "B ",
"NDCLFALP" => "0",
"DRLKINHB" => "0",
"FARMGINH" => "0",
"NDCFLABN" => "0",
"path" => "/opt/data/input/new_test1.csv",
"DRDCLFLR" => "0",
"DRFEPRO" => "0",
"NDCFLXER" => "0",
"NMFEPRO" => "0",
"SUSTRAN" => "0",
"PCRN1N2EXC" => "0",
"NDCLFSYNC" => "0",
"NMLCLPRO" => "0",
"DRLCLPRO" => "0",
"LSN" => "hlr91ip",
"SURCVERR" => "0",
"host" => "0.0.0.0",
"LOC" => "1211",
"LNKTYPE" => "IPVL",
"NDCLFINTR" => "0",
"message" => "K,hlr91ip,1211,B ,IPVL,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0"
"CLLI" => "H91"
"SWREL" => "SW03"
}

below part of code:

input {
    file {
        mode => read
        path => "/opt/data/input/new_test1.csv"
        sincedb_path => "/dev/null"
        start_position => beginning
        file_completed_action => "log"
        file_completed_log_path => "/opt/data/logstash_files/fin_eir.log"
    }
}


filter{
if [message] =~ /^$/ { drop {} }
        ruby { code => 'event.set("[@metadata][fields]", 1 + event.get("message").count(","))' 
        }



        if [@metadata][fields] == 11 {
     csv {
         separator => ","
         columns => ["CLLI","SWREL","RPTDATE","RPTIME","TZ","RPTTYPE","RPTPD","IVALDATE","IVALSTART","IVALEND","NUMENTIDS"]
        skip_empty_columns => true
        skip_header => true
         }



        ruby {
                code => "
                        event.set('event_timestamp', event.get('RPTDATE') + ' ' + event.get('RPTIME'))
                "
        }

        date {
                match => ["event_timestamp", "MM dd yyyy HH:mm:ss", "yyyy-MM-dd HH:mm:ss", "MM dd yyyy HH:mm:ss", "ISO8601"]
                timezone => "Europe/Paris"
                target => "@timestamp"
        }

        
        ruby { code => '@@metadata = event.get("@timestamp")' }
        drop {}
          



mutate {
                        remove_field => ["RPTDATE","RPTIME","RPTTYPE","RPTPD","NUMENTIDS","message","IVALEND"]
}

} else {



 csv {
         separator => ","
         autodetect_column_names => true
        }

ruby { code => 'event.set("@timestamp", @@metadata)' }

}
}




output {


 stdout { codec => rubydebug }

}

Can you provide more context? What do you mean by shift ? It is not clear what you want to do.

ok, so from the first block of "if"condition I need to get the value of columns "CLLI","SWREL",

if [@metadata][fields] == 11 {
     csv {
         separator => ","
         columns => ["CLLI","SWREL","RPTDATE","RPTIME","TZ","RPTTYPE","RPTPD","IVALDATE","IVALSTART","IVALEND","NUMENTIDS"]
        skip_empty_columns => true
        skip_header => true
         }

        ruby {
                code => "
                        event.set('event_timestamp', event.get('RPTDATE') + ' ' + event.get('RPTIME'))
                "
        }

        date {
                match => ["event_timestamp", "MM dd yyyy HH:mm:ss", "yyyy-MM-dd HH:mm:ss", "MM dd yyyy HH:mm:ss", "ISO8601"]
                timezone => "Europe/Warsaw"
                target => "@timestamp"
        }

        
        ruby { code => '@@metadata = event.get("@timestamp")' }
        drop {}
          



mutate {
                        remove_field => ["RPTDATE","RPTIME","RPTTYPE","RPTPD","NUMENTIDS","message","IVALEND"]
}

}

and put to the this block

csv {
         separator => ","
         autodetect_column_names => true
        }

ruby { code => 'event.set("@timestamp", @@metadata)' }

}

for getting in one event all of the data? I hope that now it's more clear for You @leandrojmp

I have for example one file with content

"CLLI","SWREL","RPTDATE","RPTIME","TZ","RPTTYPE","RPTPD","IVALDATE","IVALSTART","IVALEND","NUMENTIDS"
"stp06","EAGLE 47.0.0.0.0-79.13.0","2023-02-24","14:24:14","CET ","AVAILABILITY MEASUREMENTS ON LINK","LAST","2023-02-24","14:00:00","14:15:00",285

"STATUS","LSN","LOC","LINK","LNKTYPE","NEARMGIH","FARMGINH","NMDCLFLR","DRDCLFLR","SURCVERR","DRLKINHB","NDCFLABN","NDCLFSYNC","NDCFLXDA","NDCFLXER","NDCFLXDC","NDCLFALP","N
DCLFINTR","NMFEPRO","NMLCLPRO","DRFEPRO","DRLCLPRO","SUSRECVD","SUSTRAN","PCRN1N2EXC"
K,eip61sccip,1207,A  ,IPVL,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
K,eip05sccip,1207,B  ,IPVL,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
K,hlr41ip,1208,B  ,IPVL,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
K,hlr41ip,1211,A  ,IPVL,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
K,hlr91ip,1211,B  ,IPVL,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0

first part of my code getting the timestamp from the string the second part will be autodetect column but in this case I need to join the value from the first part of CSV

so currently I have the exact output

{
      "NDCFLXDA" => "0",
      "@version" => "1",
      "NDCFLXDC" => "0",
        "STATUS" => "K",
      "NMDCLFLR" => "0",
    "@timestamp" => 2023-02-24T13:24:14Z,
      "SUSRECVD" => "0",
      "NEARMGIH" => "0",
          "LINK" => "A  ",
      "NDCLFALP" => "0",
      "DRLKINHB" => "0",
      "FARMGINH" => "0",
      "NDCFLABN" => "0",
          "path" => "/opt/data/input/new_test1.csv",
      "DRDCLFLR" => "0",
       "DRFEPRO" => "0",
      "NDCFLXER" => "0",
       "NMFEPRO" => "0",
       "SUSTRAN" => "0",
    "PCRN1N2EXC" => "0",
     "NDCLFSYNC" => "0",
      "NMLCLPRO" => "0",
      "DRLCLPRO" => "0",
           "LSN" => "eip61sccip",
      "SURCVERR" => "0",
          "host" => "0.0.0.0",
           "LOC" => "1207",
       "LNKTYPE" => "IPVL",
     "NDCLFINTR" => "0",
       "message" => "K,eip61sccip,1207,A  ,IPVL,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0"
}
{
      "NDCFLXDA" => "0",
      "@version" => "1",
      "NDCFLXDC" => "0",
        "STATUS" => "K",
      "NMDCLFLR" => "0",
    "@timestamp" => 2023-02-24T13:24:14Z,
      "SUSRECVD" => "0",
      "NEARMGIH" => "0",
          "LINK" => "B  ",
      "NDCLFALP" => "0",
      "DRLKINHB" => "0",
      "FARMGINH" => "0",
      "NDCFLABN" => "0",
          "path" => "/opt/data/input/new_test1.csv",
      "DRDCLFLR" => "0",
       "DRFEPRO" => "0",
      "NDCFLXER" => "0",
       "NMFEPRO" => "0",
       "SUSTRAN" => "0",
    "PCRN1N2EXC" => "0",
     "NDCLFSYNC" => "0",
      "NMLCLPRO" => "0",
      "DRLCLPRO" => "0",
           "LSN" => "eip05sccip",
      "SURCVERR" => "0",
          "host" => "0.0.0.0",
           "LOC" => "1207",
       "LNKTYPE" => "IPVL",
     "NDCLFINTR" => "0",
       "message" => "K,eip05sccip,1207,B  ,IPVL,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0"
}
{
      "NDCFLXDA" => "0",
      "@version" => "1",
      "NDCFLXDC" => "0",
        "STATUS" => "K",
      "NMDCLFLR" => "0",
    "@timestamp" => 2023-02-24T13:24:14Z,
      "SUSRECVD" => "0",
      "NEARMGIH" => "0",
          "LINK" => "B  ",
      "NDCLFALP" => "0",
      "DRLKINHB" => "0",
      "FARMGINH" => "0",
      "NDCFLABN" => "0",
          "path" => "/opt/data/input/new_test1.csv",
      "DRDCLFLR" => "0",
       "DRFEPRO" => "0",
      "NDCFLXER" => "0",
       "NMFEPRO" => "0",
       "SUSTRAN" => "0",
    "PCRN1N2EXC" => "0",
     "NDCLFSYNC" => "0",
      "NMLCLPRO" => "0",
      "DRLCLPRO" => "0",
           "LSN" => "hlr41ip",
      "SURCVERR" => "0",
          "host" => "0.0.0.0",
           "LOC" => "1208",
       "LNKTYPE" => "IPVL",
     "NDCLFINTR" => "0",
       "message" => "K,hlr41ip,1208,B  ,IPVL,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0"
}
{
      "NDCFLXDA" => "0",
      "@version" => "1",
      "NDCFLXDC" => "0",
        "STATUS" => "K",
      "NMDCLFLR" => "0",
    "@timestamp" => 2023-02-24T13:24:14Z,
      "SUSRECVD" => "0",
      "NEARMGIH" => "0",
          "LINK" => "A  ",
      "NDCLFALP" => "0",
      "DRLKINHB" => "0",
      "FARMGINH" => "0",
      "NDCFLABN" => "0",
          "path" => "/opt/data/input/new_test1.csv",
      "DRDCLFLR" => "0",
       "DRFEPRO" => "0",
      "NDCFLXER" => "0",
       "NMFEPRO" => "0",
       "SUSTRAN" => "0",
    "PCRN1N2EXC" => "0",
     "NDCLFSYNC" => "0",
      "NMLCLPRO" => "0",
      "DRLCLPRO" => "0",
           "LSN" => "hlr41ip",
      "SURCVERR" => "0",
          "host" => "0.0.0.0",
           "LOC" => "1211",
       "LNKTYPE" => "IPVL",
     "NDCLFINTR" => "0",
       "message" => "K,hlr41ip,1211,A  ,IPVL,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0"
}
{
      "NDCFLXDA" => "0",
      "@version" => "1",
      "NDCFLXDC" => "0",
        "STATUS" => "K",
      "NMDCLFLR" => "0",
    "@timestamp" => 2023-02-24T13:24:14Z,
      "SUSRECVD" => "0",
      "NEARMGIH" => "0",
          "LINK" => "B  ",
      "NDCLFALP" => "0",
      "DRLKINHB" => "0",
      "FARMGINH" => "0",
      "NDCFLABN" => "0",
          "path" => "/opt/data/input/new_test1.csv",
      "DRDCLFLR" => "0",
       "DRFEPRO" => "0",
      "NDCFLXER" => "0",
       "NMFEPRO" => "0",
       "SUSTRAN" => "0",
    "PCRN1N2EXC" => "0",
     "NDCLFSYNC" => "0",
      "NMLCLPRO" => "0",
      "DRLCLPRO" => "0",
           "LSN" => "hlr91ip",
      "SURCVERR" => "0",
          "host" => "0.0.0.0",
           "LOC" => "1211",
       "LNKTYPE" => "IPVL",
     "NDCLFINTR" => "0",
       "message" => "K,hlr91ip,1211,B  ,IPVL,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0"
}

but I need to put in every of these event mentioned value of "CLLI","SWREL"

I'm sorry, but it is not clear what is the issue and what you want to do.

You want to parse the fields CLLI and SWREL and add them to the following events?

I don't think this is possible to do or if there is an easy way to do it.

Logstash is event based, every event is independent from each other, if you want information from previous events you normally use the aggregate filter, but I'm not sure if you can use it in your case or if it would help.

If the values of CLLI and SWREL are static, a simple add_field would work.

Also, you have two different csv formats in the same file, is this right?

Yes exactly there are two kind of CSV head in one file, so I need to grab a few fields from the first block ( clone or copy) and use it in output in the second one block of CSV

logstash is not intended to support a use case like this. You can probably do it using class variables (like your @@metadata), but you may need to set --pipeline.workers 1 and --pipeline.ordered true.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.