CSV parsing meaning get rid of logstash csv plugin?

Well I have an issue which is coming with a solution :slight_smile: but I would like to know if someone has a better idea

I am runnging logstash csv plugin 3.0.8
I had to treat unlimited different kind of CSV with different headers so I could not used the CSV columns array definition of the logstash csv plugin.
I was delighted when I saw the autodetect_column_names option and then totally depressed as it is only working once (for the first CSV that is processed).

As far as I understood, dont spend time on making the official logstash csv plugin work unless

  • you are treating only 1 kind of csv with fix columns
  • you have the luxuary to put the worker to 1
  • you have the luxuary to create 1 CSV filter by type of CSV

I was looking for a sate full solution, and the only one which can work is to have the CSV file sent atomically, i.e. sent in multiline.
The most scary and cool thing about logstash is the ruby filter, it is hard to understand if you are not familiar with Ruby (as it was for me), but it provides freedom to cope with most of the things that plugins are not providing.

the 2 good things about this design :

  • you can remove the only one worker configuration away
  • well, it works

the 2 bad things is that you need

  • to code your own ruby implementation of the CSV parsing
  • change filebeat configuration to send your CSV files in multiline

I spent quite some time to find an applicable solution for my issue, hence I hope the code below or the logic of it can be reused by someone.

filter {

# CSV format example but coud be hundreds columns and lines
# Works as long as the CSV file is sent in 1 block (using multiline)
# Hour,Cell1,Cell2,
# 201909300000,100.00,50.00,
# 201909300100,100.00,60.00,
# 201909301700,100.00,70.00,


    ruby {
      code => '

      lines = event.get("message").split("\n")
      # for DEBUG
      # puts lines
      header = lines[0].split(",")
      arrayOfEvents = Array.new()

      # lines[0] is header, line[n] is empty
      for i in 1..lines.length-1 do
        line = lines[i].split(",")

        # line[0] is timestamp, line[n] is empty
        for j in 1..line.length-1 do
          value = line[j]
           # convert all fields which are of type "100" or "99.0909" to float
          elsif value =~ /\A[+-]?(\d+\.\d+)?\Z/ || value =~ /\A\d+\Z/
            value = value.to_f
          end

          arrayOfEvents.push({
                      "key" => header[j],
                      "value" => value,
                      "log_ts" => line[0]
          })
        end
      end
      # for debug
      # puts arrayOfEvents
      event.set("event",arrayOfEvents)
      '
    }

    # split the event array in multiple events
    split {
        field => "event"
    }

    mutate {
      rename => {
        "[event][key]" => "da_key"
        "[event][value]" => "da_value"
        "[event][log_ts]" => "log_ts"
      }
    }

    date {
        match => [ "log_ts", "yyyyMMddHHmm" ]
        target => "@logtimestamp"
    }

    mutate {
      remove_field => ["fields", "beat", "input_type", "offset", "@version", "tags", "message", "event", "log_ts" ]
    }

}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.