I'm reading lines from a log that, after converting to JSON need to be split based on a field & then I want to generate fingerprint using some fields & drop the event if the fields are not found.
But I'm facing issues:
- the fingerprint generated is the same for all records ('b6589fc6ab0dc82cf12099d1c2d40ab994e8410c' which decodes to 0 - i think this is because of message.source).
- And I still get some records like
{'message': {'recordkey': 'b6589fc6ab0dc82cf12099d1c2d40ab994e8410c', 'source': '0'}}which should ideally get dropped.
my.conf
input {
pipeline { address => "kafka-output" }
}
filter {
json {
source => "message"
target => "message_deserialized"
}
ruby {
init => "require 'base64'
require 'zlib'
require 'stringio'"
code => 'event.set("[message_deserialized][message_json_decoded]", Zlib::GzipReader.new(StringIO.new(Base64.decode64(event.get("[message_deserialized][message_json]")))).read)' }
json {
source => "[message_deserialized][message_json_decoded]"
target => "[message_deserialized][message_json_decoded_deserialized]"
}
mutate {
remove_field => [ "message", "[message_deserialized][message_json]", "[message_deserialized][message_json_decoded]" ]
}
mutate {
rename => { "[message_deserialized][message_json_decoded_deserialized]" => "[message_deserialized][message_json]" }
}
mutate {
rename => { "message_deserialized" => "message" }
}
split {
field => "[message][message_json]"
}
prune {
whitelist_names => [ "message" ]
}
mutate {
add_field => { "[message][source]" => 0 }
}
fingerprint {
source => ["[message][message_json][message_timestamp]", "[message][message_json][message_body]", "[message][message_json][sender_name]", "[message][current_devic]", "[message][source]"]
}
mutate {
rename => { "[message][current_devic]" => "[message][device_id]" }
rename => { "fingerprint" => "[message][recordkey]" }
rename => { "[message][curr_dt]" => "[message][created_at]" }
rename => { "[message][batch_datetim]" => "[message][batch_datetime]" }
rename => { "[message][message_json][sender_name]" => "[message][message_json][address]" }
rename => { "[message][message_json][message_timestamp]" => "[message][message_json][received_at]" }
remove_field => ["[message][sync_final_background]"]
}
if ![message][message_json][received_at] and ![message][message_json][message_body] and ![message][message_json][address] and ![message][device_id] and ![message][source] { drop {} }
}
# kafka dev
output {
stdout { codec => rubydebug }
kafka {
bootstrap_servers => "kafka:9092"
codec => json
topic_id => "mytopic"
}
}