Fingerprint & record drop not working as expected

I'm reading lines from a log that, after converting to JSON need to be split based on a field & then I want to generate fingerprint using some fields & drop the event if the fields are not found.

But I'm facing issues:

  • the fingerprint generated is the same for all records ('b6589fc6ab0dc82cf12099d1c2d40ab994e8410c' which decodes to 0 - i think this is because of message.source).
  • And I still get some records like {'message': {'recordkey': 'b6589fc6ab0dc82cf12099d1c2d40ab994e8410c', 'source': '0'}} which should ideally get dropped.

my.conf

input {
  pipeline { address => "kafka-output" }
}

filter {
  json {
    source => "message"
    target => "message_deserialized"
  }

  ruby { 
    init => "require 'base64'
             require 'zlib'
             require 'stringio'"
    code => 'event.set("[message_deserialized][message_json_decoded]", Zlib::GzipReader.new(StringIO.new(Base64.decode64(event.get("[message_deserialized][message_json]")))).read)' }

  json {
    source => "[message_deserialized][message_json_decoded]"
    target => "[message_deserialized][message_json_decoded_deserialized]"
  }

  mutate {
    remove_field => [ "message", "[message_deserialized][message_json]", "[message_deserialized][message_json_decoded]" ]
  }

  mutate {
    rename => { "[message_deserialized][message_json_decoded_deserialized]" => "[message_deserialized][message_json]" }
  }
  
  mutate {
    rename => { "message_deserialized" => "message" }
  }

  split {     
    field => "[message][message_json]"
  }

  prune {
        whitelist_names => [ "message" ]
      }
  
  mutate {
    add_field => { "[message][source]" => 0 }
  }

  fingerprint {
    source => ["[message][message_json][message_timestamp]", "[message][message_json][message_body]", "[message][message_json][sender_name]", "[message][current_devic]", "[message][source]"]
  }

  mutate {
    rename => { "[message][current_devic]" => "[message][device_id]" }
    rename => { "fingerprint" => "[message][recordkey]" }
    rename => { "[message][curr_dt]" => "[message][created_at]" }
    rename => { "[message][batch_datetim]" => "[message][batch_datetime]" }
    rename => { "[message][message_json][sender_name]" => "[message][message_json][address]" }
    rename => { "[message][message_json][message_timestamp]" => "[message][message_json][received_at]" }
    remove_field => ["[message][sync_final_background]"]
  }

  if ![message][message_json][received_at] and ![message][message_json][message_body] and ![message][message_json][address] and ![message][device_id] and ![message][source] { drop {} }

}

# kafka dev
output {

      stdout { codec => rubydebug }

      kafka {
        bootstrap_servers => "kafka:9092"
        codec => json
        topic_id => "mytopic"
      }

    }

If you supply multiple fields in the source option, and do not set concatenate_sources or concatenate_all_fields, then the filter hashes each field in turn and overwrites the target field.

mutate { add_field => { "[message][source]" => 0 } }

Your fingerprint will always be the hash of the last field in the source option, so it is always the hash of "0".

1 Like

Thanks a lot! That worked.
Can you please help in err 2 - records not getting dropped correctly?

This conditional will never be true as you are adding the field message.source a couple of lines above, so this field will always be present.

1 Like

Oh, got it thanks.