I have a file with a structure where the actual events follow their meta.
For example, the file has contents like below
Columns = Name|Age|Gender
Delimiter = |
John|23|M
Jane|25|F
Columns = Country,State
Delimiter = ,
Canada,Ontario
USA,Nevada
I want to parse the file so that, output looks something like this
{
"message": "John|23|M",
"Name": "John",
"Age": "23",
"Gender": "M"
},
{
message: "Canada,Ontario",
"Country": "Canada",
"State": "Ontario"
}
Basically I want to have data extracted using dynamic column and delimiter.
Badger
February 23, 2024, 8:27pm
2
There are no plugins that can do this other than ruby. The following will handle that file. Additional code could be borrowed from the csv filter to support more options and add error handling.
ruby {
init => '
@columns = []
@delimiter = ""
@quote_char = %q["]
@columnsPrefix = "Columns = "
@delimiterPrefix = "Delimiter = "
'
code => '
m = event.get("message")
if m.start_with?(@columnsPrefix)
@columnsString = m.delete_prefix(@columnsPrefix)
event.cancel
elsif m.start_with?(@delimiterPrefix)
@delimiter = m.delete_prefix(@delimiterPrefix)
@columns = @columnsString.split(@delimiter)
event.cancel
elsif m == "[DATA]" or m =~ /^\s*$/
event.cancel
else
values = CSV.parse_line(m, :col_sep => @delimiter, :quote_char => @quote_char)
values.each_index { |x|
event.set(@columns[x], values[x])
}
end
'
}
2 Likes
Rios
(Rios)
February 25, 2024, 3:03pm
3
Well it's possible something similar with multi match grok.
v1:
csv-sample.txt
Columns = John|23|M
Columns = Jane|25|F
Delimiter = |
Columns = Canada,Ontario
Columns = USA,Nevada
Delimiter = ,
input {
file {
path => "/path/csv-sample.txt"
start_position => beginning
sincedb_path => "/dev/null" # NUL Windows
}
}
filter {
if [message] =~ /^Delimiter =/ {
drop{}
}
grok {
match => {
break_on_match => "true"
"message" => [ "^Columns = %{DATA:Name}\|%{INT:Age}\|%{WORD:Gender}", "^Columns = %{DATA:Country}\,%{DATA:State}.$"]
}
}
mutate {
gsub => [ "message","^Columns = ",""]
gsub => [ "message","^Delimiter = ",""]
gsub => [ "message","(\r|\n)",""]
}
mutate{ remove_field => [ "log", "event", "host", "@version", "@timestamp"] }
}
output {
stdout { codec => rubydebug{} }
}
Result:
{
"Age" => "23",
"message" => "John|23|M",
"Name" => "John",
"Gender" => "M"
}
{
"Country" => "USA",
"State" => "Nevada",
"message" => "USA,Nevada"
}
{
"Country" => "Canada",
"State" => "Ontario",
"message" => "Canada,Ontario"
}
{
"Age" => "25",
"message" => "Jane|25|F",
"Name" => "Jane",
"Gender" => "F"
}
And yes, Budger will send me to The International Criminal Court, but it's possible
The problem is to identify the name of the columns from the contents of the file itself.
match => {
break_on_match => "true"
"message" => [ "^Columns = %{DATA:Name}\|%{INT:Age}\|%{WORD:Gender}", "^Columns = %{DATA:Country}\,%{DATA:State}.$"]
}
Here you are hard-coding the name of the columns. So, this approach will only work if the column names are already know.
Excellent approach. Thanks for the insight. Although, there is one glitch in the solution. Logstash uses multiple pipeline workers to process the input. So, for this code to work we have to turn on the preserve ordering setting and set pipeline worker to 1.
Rios
(Rios)
February 28, 2024, 6:38am
6
As always posts are written by Scrum, not by SDLC
Anyway, I am so glad you finally have the solution.
system
(system)
Closed
March 27, 2024, 6:38am
7
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.