Unable to index csv to elasticsearch (MalformedCSVError: Unclosed quoted field)

Hi,

I have been trying to index csv file to elasticsearch using logstash. But, due to issue related to the csv file, I am unable to do so.
Sample data:

file,id,Last-Updated,SubmittedOn,Engg,Des
/path/to/file.c,id21,4/22/2018 4:36,2/7/2018 16:46,sam,"This is sample data 1.

Another sentence."

My conf file:

input {

   file {

       path => ["{path}/sample_csv.csv"]
	   start_position => "beginning"
	   sincedb_path => "nul"
   }
}

filter {
      csv {
          separator => ","
   	      autodetect_column_names => true
   	      autogenerate_column_names => false
   }
}

output {

   elasticsearch {
   	   hosts => ["localhost:9200"]
	   index => "sample_index"
   }
 }

Error Message:

[2019-12-11T12:21:18,209][WARN ][logstash.filters.csv     ][main] Error parsing             
csv {:field=>"message", :source=>"/path/to/file.c,id21,4/22/2018 4:36,2/7/2018 
16:46,sam,\"This is sample data 1.\r", :exception=>#<CSV::MalformedCSVError: 
Unclosed quoted field on line 1.>}

The trouble here is that the File Input is line-oriented, emitting each line as an event, which means that by the time the CSV filter gets the events, they are already separated and cannot be reliably reassembled.

Since the CSV filter needs to work with exactly one "row" of the CSV document, in order to continue using it we need to figure out how to tell the File input how to parse CSV's, which is a bit of a Chicken vs Egg problem.

Unfortunately I don't have a solution using the File input plugin, as even in mode => read, it still hands off data to the codec one line at a time.

If you were willing to use a different input (e.g., the stdin input for one-time processing during which you pipe the contents of the file), the following may get you on track.

input {
  stdin {
    codec => plain
  }
}
filter {
  # hack: ensure that the CSV library is loaded
  if false { csv {} }

  # take the contents of `message`, and parse it using Ruby's csv library
  ruby {
    code => "
      lines = CSV.parse(event.get('message'), :headers => true).map(&:to_hash)
      event.set('lines', lines)
    "
  }
  # split the result into many small events, each with one entry from the CSV
  split {
    field => "lines"
    target => "message"
  }

  # ... (other filters)
}
output {
  # outputs
}

usage:

bin/logstash -f pipeline.conf < sample.csv

Hi

Maybe you could use the multiline codec in your input plugin. That will give you one single event for the whole of your csv file.

Then check what your message looks like and, based on this information, you could use the mutate filter gsup, or something, to remove the newline caracters.

Then find a way to split the message again into separate events. Probably the spit filter with terminator => "^/", followed by csv filter to separate data into fields and then a mutate filter to add the now missing "/" at the beginning of your file field.

Now you should have one event per liine, and proceed as usual with your other filters and the output.

Just a suggestion, haven't tried it myself.

Hope this helps.

Hi,

Sorry couldn't respond earlier, I have been trying a workaround using python. Will try what you suggested.

Thank-you for the help :slight_smile:

Hi, I indexed csv to elasticsearch using python. Couldn't spend much time on the approaches suggested by you guys due to time limit. Thank-you for the response though :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.