Hi there,
I'm using Logstash to receive events from winlogbeat and send them to Kafka which will ultimately send them further. To be able to correctly process those events at the end of the pipe, I need to :
Convert the JSON array to a supported type (for example : string).
Delete the whitespaces included in certain key fields.
And if possible, without knowing the input data in advance.
Here is an example of the data I receive :
{
"key1": {
"key2": "value1",
"key3": "value2",
"key4": [
"value3",
"value4"
]
}
"key5": {
"key6": "value5",
"key7": {
"key8 with space": "value6",
"key9 with space": "value7"
}
}
}
I would like to flatten the key4 array and remove the spaces in key8 and key9 fields.
Here is my actual Logstash pipeline configuration :
input {
beats {
port => "5044"
}
}
filter {
mutate {
convert => {
"key4" => "string"
}
}
kv {
remove_char_key => " "
}
}
output {
stdout { codec => json}
}
But none of the filter does anything.
I've also tried several variation using "json" filter, "split" filter or "mutate rename" filter but none of them helped.
I think I may not completely understand how Logstash process the input data
Thanks a lot in advance for your help.
For those interested, I've figured out how to proceed.
To convert the JSON array, I've used a grok filter to extract the values to a new root field and then a mutate filter to delete the original array :
grok {
break_on_match => false
match => {
"[key1][key4][0]" => "%{GREEDYDATA:key1_key4_0}"
"[key1][key4[1]" => "%{GREEDYDATA:key1_key4_1}"
}
}
<...>
mutate {
remove_field => [ "[key1][key4]" ]
}
I don't know how to do it without knowing the keys in advance however.
And to delete spaces included in key fields, I've used a mutate filter :
filter {
mutate {
rename => { "[key5][key7][key8 with space]" => "[key5][key7][key8_without_space]"}
rename => { "[key5][key7][key9 with space]" => "[key5][key7][key9_without_space]"}
}
I've also tried to do it automatically without writing all the keys down with a ruby code filter :
ruby {
code => '
hash = event.to_hash
hash.each do |k,v|
k.strip!
event.set(k,v)
end
'
}
But that didn't work. I guess it's related to the fact that the key fields to rename are nested fields.
You could do that using ruby....
ruby {
init => '
def doSomething(object, name, event)
#puts "Working on #{name}"
#Removed "if object" test since we need to process null valued fields
if object.kind_of?(Hash) and object != {}
object.each { |k, v| doSomething(v, "#{name}[#{k}]", event) }
elsif object.kind_of?(Array) and object != []
event.set(name, object.to_s)
else
lastElement = name.gsub(/^.*\[/, "").gsub(/\]$/, "")
if lastElement =~ / /
event.set(name.gsub(" ", ""), event.remove(name))
end
end
end
'
code => '
event.to_hash.each { |k, v|
doSomething(v, "[#{k}]", event)
}
'
}
which will produce
"key5" => {
"key6" => "value5",
"key7" => {
"key8withspace" => "value6",
"key9withspace" => "value7"
}
},
"key1" => {
"key4" => "[\"value3\", \"value4\"]",
"key2" => "value1",
"key3" => "value2"
}
OK, I will try this. Thank you !
system
(system)
Closed
June 6, 2023, 11:56am
5
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.