This is the input I am using for logstash.
ItemId AssetId ItemName Comment
11111 07 ABCDa XYZa
11112 07 ABCDb XYZb
11113 07 ABCDc XYZc
11114 07 ABCDd XYZd
11115 07 ABCDe XYZe
11116 07 ABCDf XYZf
11117 07 ABCDg XYZg
Date Time rows columns
19-05-2020 13:03 2 2
19-05-2020 13:03 2 2
19-05-2020 13:03 2 2
19-05-2020 13:03 2 2
19-05-2020 13:03 2 2
I need to remove first 8 lines from the csv and make the next line as column header and parse rest of lines as usual. Is there a way to do that in logstash?
I would use a multiline codec to combine each group of lines. Hopefully something like
pattern => "^\d" negate => false what => previous
would work, so that you get two events. The first being
"ItemId AssetId ItemName Comment\n11111 07 ABCDa XYZa\n11112 \n7 ABCDb XYZb\n11113 07 ABCDc XYZc\n11114 07 ABCDd XYZd\n11115 07 ABCDe XYZe\n11116 07 ABCDf XYZf\n11117 07 ABCDg XYZg"
You may need to clean up the field separators
mutate { gsub => [ "message", "\s+", " " ] }
Then tag each event
if [message] =~ /^Item/ {
mutate { add_field => { "[@metadata][format]" => "format1" } }
} else {
mutate { add_field => { "[@metadata][format]" => "format2" } }
}
Then use mutate+split to split [message] into an array of lines, then a split filter to convert the array into multiple events. You can then use csv filters for the two formats
if [@metadata][format] == "format1" {
csv { separator => " " columns => [ "ItemId", "AssetId", "ItemName", "Comment" ] ... }
if [ItemId] == "ItemId" { drop {} }
} else {
csv { separator => " " columns => [ "Date", "Time", "rows", "columns" ] ... }
if [Date] == "Date" { drop {} }
}
There are other ways of handling the headers (autodetect_column_names for example) but then you need pipeline.workers to be one and pipeline.ordered to be true.