Can I use an existing field in the event to create the columns for a CSV filter?

I've been struugling with this for far too long and I'm giving up and asking for help. I am being sent data via a Kafka topic which contains a field called 'headers' which is a CSV string of column headers, and a field called 'data' which is a CSV string of the corresponding data. What I want to achieve is to be able to use the header string as the 'columns' option is a CSV filter.

I am trying to use this but so far unsuccessfully:

csv {
        source => "Data"
        columns => "%{Headers}"
        target => "data"
      }

I have tried using,

mutate {
      split => { "headers" => ","}
     }

first, to create an array of headers, but this doesn't seem to work either as in the output I end up with 2 arrays, one of headers:

    "Headers" => [
            [  0] "serverIP",
            [  1] "statsType",
            [  2] "adjacencyName",
            [  3] "adjStatAccountName",
            [  4] "adjStatSrcActiveCalls",
            [  5] "adjStatDstActiveCalls",
            [  6] "adjStatSrcActivatingCalls"
    ]

and one for the data:

"data" => [
             "column168" => "0",
             "column245" => "0",
              "column18" => "0",
             "column209" => "4294967295",
             "column244" => "0",
             "column153" => "0 / 10000 %"
]

In reality, the headers and corresponsing data strings have several hundred fields in them, so I don't really want to have to write them all out in the logstash filter.

Can anyone suggest a way that I can build the columns array in such a way that I can then pass it to the CSV filter to be used so that I get a proper key => value pair result?

I would do that in ruby

input { generator { count => 1 lines => [ '{ "headers": "a,b,c,d", "data": "1,2,3,4" }', '{ "headers": "d,e,f,g", "data": "9,8,7,6" }' ] } }
filter {
    json { source => "message" remove_field => [ "message" ] }
    ruby {
        init => "
            require 'csv'
        "
        code => '
            data = event.get("data")
            headers = event.get("headers")
            if headers and data
                table = CSV.parse(data, { :headers => headers })
                dataAsHash = table.map { |row| row.to_hash }
            end
            # event.set("dataAsHash", dataAsHash[0])
            #   or
            dataAsHash[0].each { |k, v| event.set(k, v) }
        '
    }
}

will get you

{
"@timestamp" => 2020-08-08T16:56:42.797Z,
      "data" => "1,2,3,4",
         "a" => "1",
         "b" => "2",
         "c" => "3",
         "d" => "4",
   "headers" => "a,b,c,d",
...

You might want to add

remove_field => [ "data", "headers" ]

to the ruby filter so that those fields are removed if there are no ruby exceptions.

Thank you ! Worked perfectly.

I did try to use Ruby to try stich the header string and data string together, but alas that didn't work. This though is perfect. Thanks.