Special characters in data preventing CSV import using logstash 6.6.2

Hi all, I am trying to import a CSV file which is an export from a Service Now ticketing system into ES using Logstash 6.6.2 - but it works if I use manually entered test data - but as soon as I use actual data it fails. I suspect this may be due to the strange characters included in the CSV data such as * / . : etc but am struggling to find a way of resolving. I would be happy to automatically remove all special characters from the problem fields as they are not required.
My conf file is below. any help appreciated.

input {
file {
id => "newtestv2"
path => "C:/elk6/logstash-6.6.2/bin/import/inc-*.csv"
type => "csv"
start_position => "beginning"
type => "csv"
mode => "read"
}
}

filter {
csv {
separator => ","
columns => [ "Number","Short description","Priority","State","Assignment Group","Assigned to","Closure Reason","Atos Primary Category","Comments and Work notes","Created","Description","Duration","Impact","Time worked","Updated","Made SLA","Closed","Additional comments","Created by","Escalation","Resolution Description","SLA due","Tags","Updated by","Updates","Work notes list","Work notes","Active","Activity due","Contact type","Reassignment count","Service offering","Resolution Code","Resolve time","Service Impact","Severity","Operating System","Incident Type","Environment","Closed Date","Closed by","Asset ID","Resolved by","Resolved","Team Name","On-Hold Reason","Longest Team Assignment","Incident State","Category","Incident Reassignment","Additional assignee list","Group list" ]
skip_header => "true"
convert => {
"Duration" => "integer"
"Time worked" => "integer"
"Longest Team Assignment" => "integer"
}
}

Date / time fields

@timestamp field

date {
match => [ "Created", "dd/MM/yyyy HH:mm" ]
timezone => "Europe/London"
}
date {
match => [ "Created", "dd/MM/yyyy HH:mm" ]
target => "Created"
timezone => "Europe/London"
}

if ![Closed] {
mutate {
update => { "Closed" => "1900-01-01T00:00:00Z" }
}
} else {
date {
match => [ "Closed", "dd/MM/yyyy HH:mm" ]
target => "Closed"
timezone => "Europe/London"
}
}
if ![Resolved] {
mutate {
update => { "Resolved" => "1900-01-01T00:00:00Z" }
}
} else {
date {
match => [ "Resolved", "dd/MM/yyyy HH:mm" ]
target => "Resolved"
timezone => "Europe/London"
}
}
if ![Updated] {
mutate {
update => { "Updated" => "1900-01-01T00:00:00Z" }
}
} else {
date {
match => [ "Updated", "dd/MM/yyyy HH:mm" ]
target => "Updated"
timezone => "Europe/London"
}
}
}
output {
elasticsearch {

hosts => ["http://localhost:9200"]
action => "index"
index => ["test"]
# Ensures updates overwrite existing rather than duplicating
document_id => "%{[Number]}"

}

stdout {
# To debug use rubydebug, or dots for limited output
codec => rubydebug
}
}

The logstash log output is below
[2019-03-19T16:38:19,152][WARN ][logstash.codecs.plain ] Received an event that has a different character encoding than you configured. {:text=>"IF FTF \x96 DELETE ALL QUESTIONS BELOW (ENSURE YOU ASSIGN ALL FTF TICKETS TO YOURSELF & INCLUDE RESOLUTION)", :expected_charset=>"UTF-8"}
[2019-03-19T16:38:19,158][WARN ][logstash.codecs.plain ] Received an event that has a different character encoding than you configured. {:text=>"if no \x96 ensure you obtain alternative contact/mobile", :expected_charset=>"UTF-8"}
[2019-03-19T16:38:19,585][WARN ][logstash.filters.csv ] Error parsing csv {:field=>"message", :source=>"INC001009178,Windows 10 - Access to eForms (not authorized),3 - Medium,Closed,Accenture.PortedApps.eForms,name,Resolved,Application,"2018-06-23 19:14:46 BST - (Additional comments)", :exception=>#<CSV::MalformedCSVError: Unclosed quoted field on line 1.>}
[2019-03-19T16:38:19,750][WARN ][logstash.filters.csv ] Error parsing csv {:field=>"message", :source=>"",14/06/2018 23:58,"1. Full description of incident and troubleshooting attempted: User and 15 colleague are all experiencing the same issue with being unable to use eForms. They all have access. ", :exception=>#<CSV::MalformedCSVError: Unclosed quoted field on line 1.>}
[2019-03-19T16:38:19,765][WARN ][logstash.filters.csv ] Error parsing csv {:field=>"message", :source=>"", :exception=>#<NoMethodError: undefined method `each_index' for nil:NilClass>}

Hi Mark,

If the fields in your CSV data have new lines / carriage returns the standard single-line codec used by the file input won't handle them. You can add a multiline codec though:

input {
file {
id => "newtestv2"
path => "C:/elk6/logstash-6.6.2/bin/import/inc-*.csv"
start_position => "beginning"
type => "csv"
mode => "read"
# Assume any line not starting with a quote (") is a continuation of the previous line
codec => multiline {
pattern => '^"'
negate => "true"
what => "previous"
# UTF-8 is default but some characters from ServiceNow records are not UTF-8
charset => "ISO-8859-1"
}
}
}

Thanks,

David

Hi David, unfortunately that does not fix the import issue.

[2019-03-19T17:24:44,831][WARN ][logstash.filters.csv ] Error parsing csv {:field=>"message", :source=>"",14/06/2018 23:58,description,643771,5 - None,5053,30/06/2018 00:04,FALSE,30/06/2018 00:04,add1,testing,-5,res1,,,system,16,,note1,FALSE,,Phone,1,eFORMS Mobile,Information/advice given,643771,None,3 - Low,Windows 10,Incident,Production,29/06/2018,,testing,name,22/06/2018 10:38,PortedApps.eForms,,1266489,,Application,Referred onto next group,,\r", :exception=>#<CSV::MalformedCSVError: Unclosed quoted field on line 1.>}

Hi Mark,

It seems to be the same error - my example assumed that every new record started with a quoted string, and any line without a quote at the start was part of the previous record. If you adjust the regular expression to detect new records in the CSV file it should work. For example, if every new record starts with an incident number like: INC0123456, then this should work:

	# Assume any line not starting with INC is a continuation of the previous line
	codec => multiline {
	  pattern => '^INC"'
	  negate => "true"
	  what => "previous"
	}

If a new line is present inside a field (for example, a comments field) it's unlikely to start INC, and this will then join it to the previous line for the CSV parser to process.

Cheers,

David

Thanks David, I have updated my config as below but now the CSV disappears when I place in the importing folder but no data is imported and no entries are created in the logstash log !

input {
file {
id => "incident-servicenow-mps"
path => "C:/elk/importing/inc-*.csv"
start_position => "beginning"
type => "csv"
mode => "read"
# Assume any line not starting with a quote is a continuation of the previous line
codec => multiline {
pattern => '^INC"'
negate => "true"
what => "previous"
charset => "ISO-8859-1"
}
}
}

filter {
csv {
separator => ","
columns => [ "Number","Short description","Priority","State","Assignment Group","Assigned to","Closure Reason","Atos Primary Category","Comments and Work notes","Created","Description","Duration","Impact","Time worked","Updated","Made SLA","Closed","Additional comments","Created by","Escalation","Resolution Description","SLA due","Tags","Updated by","Updates","Work notes list","Work notes","Active","Activity due","Contact type","Reassignment count","Service offering","Resolution Code","Resolve time","Service Impact","Severity","Operating System","Incident Type","Environment","Closed Date","Closed by","Asset ID","Resolved by","Resolved","Team Name","On-Hold Reason","Longest Team Assignment","Incident State","Category","Incident Reassignment","Additional assignee list","Group list" ]
skip_header => "true"
convert => {
"Duration" => "integer"
"Time worked" => "integer"
"Longest Team Assignment" => "integer"
}
}

Date / time fields

@timestamp field

date {
match => [ "Created", "dd/MM/yyyy HH:mm" ]
timezone => "Europe/London"
}
date {
match => [ "Created", "dd/MM/yyyy HH:mm" ]
target => "Created"
timezone => "Europe/London"
}

if ![Closed] {
mutate {
update => { "Closed" => "1900-01-01T00:00:00Z" }
}
} else {
date {
match => [ "Closed", "dd/MM/yyyy HH:mm" ]
target => "Closed"
timezone => "Europe/London"
}
}
if ![Resolved] {
mutate {
update => { "Resolved" => "1900-01-01T00:00:00Z" }
}
} else {
date {
match => [ "Resolved", "dd/MM/yyyy HH:mm" ]
target => "Resolved"
timezone => "Europe/London"
}
}
if ![Updated] {
mutate {
update => { "Updated" => "1900-01-01T00:00:00Z" }
}
} else {
date {
match => [ "Updated", "dd/MM/yyyy HH:mm" ]
target => "Updated"
timezone => "Europe/London"
}
}
}
output {
elasticsearch {

hosts => ["http://localhost:9200"]
action => "index"
index => ["test"]
# Ensures updates overwrite existing rather than duplicating
document_id => "%{[Number]}"

}

stdout {
# To debug use rubydebug, or dots for limited output
codec => rubydebug
}
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.