How to prevent duplicate and has null value documents with fingerprint

my csv file =>
name,surname,age,email,phone
Harry,Potter,18,NULL,NULL
Harry,Potter,NULL,harrypotter@gmail.com,+955555555
Harry,Potter,NULL,harrypotter@gmail.com,NULL
Harry,Potter,NULL,NULL,+955555555

When I want to detect and delete duplicate documents with
fingerprint method, it creates a new document for each row.

filter {
    fingerprint {
        key => "1234ABCD"
        method => "MD5"
        source => ["name","surname","age","email","phone"]
        target => "[@metadata][generated_id]"
    }
}
output {
    stdout { codec => dots }
    elasticsearch {
        index => "null_problem_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
        action => 'update'
    }
}

If I specify only the name and surname fields for the source as in the code blog below
this time it does not read the other rows after reading the first row.

filter {
    fingerprint {
        key => "1234ABCD"
        method => "MD5"
        source => ["name","surname"]
        target => "[@metadata][generated_id]"
    }
}
output {
    stdout { codec => dots }
    elasticsearch {
        index => "null_problem_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
        action => 'update'
    }
}

dear friends please help me! İ want to see just one document like this;

Harry,Potter,18,harrypotter@gmail.com,+955555555

Hi @Busra_Duygu Welcome to the community.

I think you need to concatenate the sources the sources

concatenate_sources => true

I also think perhaps you want to use doc_as_upsert see here

doc_as_upsert => true

1 Like

First of all thank you very much for replying :slight_smile: but again added all lines to elastic as document.

filter {
      fingerprint {
        key => "1234ABCD"
        method => "UUID"
        source => ["name","surname","age","email","phone"]
        target => "[@metadata][generated_id]"
        concatenate_sources => "true"
      }
}

output {
    stdout { codec => dots }
    elasticsearch {
        index => "null_problem_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
        doc_as_upsert => "true"
    }
}

First Try

    method => "SHA1"

I don't think you want UUID
"If set to UUID , a UUID will be generated. The result will be random and thus not a consistent hash."

Also when I look at your input data each row IS unique? across all the source values so I would expect each row to be a new unique row in the results

when you specified only

    source => ["name","surname"]

What was is actual result ... I would expect to be only the LAST ROW since all rows match the the fingerprint criteria so each row updates with the next.

So Better question (and perhaps I should have asked that first) with the 4 input rows you show what do you expect / want the output to be?

1 Like

Don't forget pipeline.ordered and pipeline.workers.

1 Like

Ahh thanks @Badger

So in logstash.yml following correct?

pipeline.ordered : true (or auto if workers set to 1)

pipeline.workers : 1
1 Like

Also when I look at your input data each row IS unique? across all the source values so I would expect each row to be a new unique row in the results
yes each line in the csv file has unique records but they are all information of the same person.
For example, in the bill of lading data, a firm makes more than one export or import. The first record has the address of the company, but no phone number. In the second record, the company has a phone number, but not an address. I want to get only company information from bill of lading data. There are two documents of the same company. I want to see only one document. If this is the document I want to see, I want it to contain both phone number and address information.

The content of the document I want to see is as follows:
Harry,Potter,18,harrypotter@gmail.com,+955555555

To the logstash.yml file

pipeline.ordered: auto
pipeline.workers : 1

I added these but it still didn't work. The codes are like this;

filter {
     aggregate {
        task_id => "%{name}"
        code => "map['sql_duration'] = 0"
        map_action => "create"
      }

       fingerprint {
         key => "1234ABCD"
         method => "SHA1"
         source => ["name","surname"]
         target => "[@metadata][generated_id]"
         concatenate_sources => "true"
       }
}
output {
     stdout { codec => dots }
     elasticsearch {
         index => "null_problem_fingerprint"
         document_id => "%{[@metadata][generated_id]}"
         doc_as_upsert => "true"
     }
}

Result like this:

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "null_problem_fingerprint",
        "_type" : "_doc",
        "_id" : "e6ed065d89c16d449ff08e93418e589d3e217256",
        "_score" : 1.0,
   "_source" : {
          "name" : "Harry",
          "surname" : "Potter",
          "@timestamp" : "2021-07-08T14:23:05.629Z",
          "age" : "18",
          "phone" : "+955555555",
          "@version" : "1",
          "email" : null
        }
      }
    ]
  }
}

There is no reason to use a key for a fingerprint filter. It will happily use a hash rather than a digest if you do not set a key.

doc_as_upsert will update fields on the document with fields from the event. If those fields are null then they will still get overwritten. You need to remove fields from the event if you do not want them to be set on the document. So for

Harry,Potter,NULL,harrypotter@gmail.com,NULL

you need to delete the age and phone number fields before sending them to elasticsearch.

1 Like

I removed the key parameter as you said, but I couldn't figure out how to remove the phone number and age fields from the event. Could you please elaborate a little more?

@Badger
I did what you said I ran the following codes to delete the fields with null values while adding the lines from the csv file to the first index. Then again I tried to block this duplicate data using the fingerprint method doc_as_upsert => "true" using this as well but without success.

null_problem.conf:

input{
    file { 
      path => ".../null_problem.csv"
      start_position => "beginning"
      sincedb_path => "NUL" 
    }
}
filter{
    csv{        
        autodetect_column_names => "true"
        separator => ","
        skip_header => "true"
        columns => ["name","surname","age","email","phone"]
    }
    if [age] and [email] and [phone] == "" {
      drop { }
    }
    mutate { 
        remove_field =>["path", "host", "message", "@version", "@timestamp", "trade_date"]
    }
}
output{
    elasticsearch { 
        hosts => "http://localhost:9200"
        index => "null_problem"
        document_type => "_doc"
    }
    stdout {}
}

null_problem_finger :

input {
  elasticsearch {
    hosts => "localhost"
    index => "null_problem"
    query => '{ "sort": [ "_doc" ] }'
  }
}
filter {
    fingerprint {
      method => "SHA1"
      source => ["name","surname"]
      target => "[@metadata][generated_id]"
      concatenate_sources => "true"
    }
}
output {
    stdout { codec => dots }
    elasticsearch {
        index => "null_problem_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
        doc_as_upsert => "true"
    }
}

That says 'if the age field exists, and the email field exists, and the phone field is equal to "" then delete the event'. It's probably not what you want. If the csv literally contains the string "NULL" then what you want is

if [email] == "NULL" { mutate { remove_field => [ "email" ] } }

etc.

@Badger
I tried the code blog below to delete the empty fields and it did, but I still couldn't get the result I wanted.

ruby {
        code => "
            def walk_hash(parent, path, hash)
                path << parent if parent
                hash.each do |key, value|
                walk_hash(key, path, value) if value.is_a?(Hash)
                @paths << (path + [key]).map {|p| '[' + p + ']' }.join('')
                end
                path.pop
            end
            @paths = []
            walk_hash(nil, [], event.to_hash)
            @paths.each do |path|
                value = event.get(path)
                event.remove(path) if value.nil? || (value.respond_to?(:empty?) && value.empty?)
            end
            "
    }

@Badger
To achieve these, I first added the csv file to the null_problem index, then I created an index called null_problem_finger to organize these duplicate documents with the fingerprint method, but I was unsuccessful.

null_problem index=>

input{
    file { 
      path => ".../null_problem.csv"
      start_position => "beginning"
      sincedb_path => "NUL" 
    }
}
filter{
    csv{        
        autodetect_column_names => "true"
        separator => ","
        skip_header => "true"
        columns => ["name","surname","age","email","phone"]
    }
    mutate { 
        remove_field =>["path", "host", "message", "@version", "@timestamp", "trade_date"]
    }
    ruby {
        code => "
            def walk_hash(parent, path, hash)
                path << parent if parent
                hash.each do |key, value|
                walk_hash(key, path, value) if value.is_a?(Hash)
                @paths << (path + [key]).map {|p| '[' + p + ']' }.join('')
                end
                path.pop
            end
            @paths = []
            walk_hash(nil, [], event.to_hash)
            @paths.each do |path|
                value = event.get(path)
                event.remove(path) if value.nil? || (value.respond_to?(:empty?) && value.empty?)
            end
            "
    }
}
output{
    elasticsearch { 
        hosts => "http://localhost:9200"
        index => "null_problem"
        document_type => "_doc"
    }
    stdout {}
}

null_problem_fingerprint index =>

input {
  elasticsearch {
    hosts => "localhost"
    index => "null_problem"
    query => '{ "sort": [ "_doc" ] }'
  }
}
filter{  
    fingerprint {
    method => "SHA1"
    source => ["name","surname","age","email","phone"]
    target => "[@metadata][generated_id]"
    concatenate_sources => "true"   
  }
  mutate { 
        remove_field =>["path", "host", "message", "@version", "@timestamp", "trade_date"]
  }
}
output {
    stdout { codec => dots }
    elasticsearch {
        index => "null_problem_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
        doc_as_upsert => "true"
        action => "update"
    }
}

I deleted the fields with null values with the code blog in ruby, but after making the fingerprint, I still could not reach the desired output. Please help me!

Actually the csv file is exactly like this:

My csv file =>
name,surname,age,email,phone
Busra,Duygu,99,,05555555555
Busra,Duygu,,busraduygu@gmail.com,
Busra,Duygu,99,,
Busra,Duygu,,,

I wrote null for better understanding

This works for me.

The problem is you are trying to use all the fields for the fingerprint... but all the fields do not exists on every row, so it does not make sense to try to use all the columns for fingerprint, you should only use the ones available on every row. The only fields that exist on every row are name and surname. So you might have a collision if users have the same are name and surname.

input {
    file { 
        path => "/Users/sbrown/workspace/sample-data/discuss/fingerprint.csv"
        start_position => "beginning"
        sincedb_path => "/dev/null" 
    }
}

filter {
    csv {        
        autodetect_column_names => "true"
        separator => ","
        skip_header => "true"
        columns => ["name","surname","age","email","phone"]
    }
    mutate { 
        remove_field =>["path", "host", "message", "@version", "@timestamp", "trade_date"]
    }

    if ![email] { mutate { remove_field => [ "email" ] } }
    if ![phone] { mutate { remove_field => [ "phone" ] } }
    if ![age] { mutate { remove_field => [ "age" ] } }

    fingerprint {
      method => "SHA1"
      source => ["name","surname"]
      target => "fingerprint"
      concatenate_sources => "true"   
  }

}


output {
    elasticsearch { 
        hosts => "http://localhost:9200"
        index => "null_problem"
        document_type => "_doc"
        document_id => "%{fingerprint}"
        action => 'update'
        doc_as_upsert => true
    }
  stdout {codec => rubydebug}

}

GET null_problem/_search

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "null_problem",
        "_type" : "_doc",
        "_id" : "fda96a9008c6ef62e4cf346636cf97e57112519d",
        "_score" : 1.0,
        "_source" : {
          "surname" : "Duygu",
          "fingerprint" : "fda96a9008c6ef62e4cf346636cf97e57112519d",
          "name" : "Busra",
          "age" : "99",
          "phone" : "05555555555",
          "email" : "busraduygu@gmail.com"
        }
      }
    ]
  }
}
1 Like

@stephenb Thank you very much for your all help :hugs: :hugs: :hugs: , it worked for me too :ok_hand:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.