Can I use an existing field in the event to create the columns for a CSV filter?

I've been struugling with this for far too long and I'm giving up and asking for help. I am being sent data via a Kafka topic which contains a field called 'headers' which is a CSV string of column headers, and a field called 'data' which is a CSV string of the corresponding data. What I want to achieve is to be able to use the header string as the 'columns' option is a CSV filter.

I am trying to use this but so far unsuccessfully:

csv {
        source => "Data"
        columns => "%{Headers}"
        target => "data"
      }

I have tried using,

mutate {
      split => { "headers" => ","}
     }

first, to create an array of headers, but this doesn't seem to work either as in the output I end up with 2 arrays, one of headers:

    "Headers" => [
            [  0] "serverIP",
            [  1] "statsType",
            [  2] "adjacencyName",
            [  3] "adjStatAccountName",
            [  4] "adjStatSrcActiveCalls",
            [  5] "adjStatDstActiveCalls",
            [  6] "adjStatSrcActivatingCalls"
    ]

and one for the data:

"data" => [
             "column168" => "0",
             "column245" => "0",
              "column18" => "0",
             "column209" => "4294967295",
             "column244" => "0",
             "column153" => "0 / 10000 %"
]

In reality, the headers and corresponsing data strings have several hundred fields in them, so I don't really want to have to write them all out in the logstash filter.

Can anyone suggest a way that I can build the columns array in such a way that I can then pass it to the CSV filter to be used so that I get a proper key => value pair result?

1 Like

I would do that in ruby

input { generator { count => 1 lines => [ '{ "headers": "a,b,c,d", "data": "1,2,3,4" }', '{ "headers": "d,e,f,g", "data": "9,8,7,6" }' ] } }
filter {
    json { source => "message" remove_field => [ "message" ] }
    ruby {
        init => "
            require 'csv'
        "
        code => '
            data = event.get("data")
            headers = event.get("headers")
            if headers and data
                table = CSV.parse(data, { :headers => headers })
                dataAsHash = table.map { |row| row.to_hash }
            end
            # event.set("dataAsHash", dataAsHash[0])
            #   or
            dataAsHash[0].each { |k, v| event.set(k, v) }
        '
    }
}

will get you

{
"@timestamp" => 2020-08-08T16:56:42.797Z,
      "data" => "1,2,3,4",
         "a" => "1",
         "b" => "2",
         "c" => "3",
         "d" => "4",
   "headers" => "a,b,c,d",
...

You might want to add

remove_field => [ "data", "headers" ]

to the ruby filter so that those fields are removed if there are no ruby exceptions.

1 Like

Thank you ! Worked perfectly.

I did try to use Ruby to try stich the header string and data string together, but alas that didn't work. This though is perfect. Thanks.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.