How we can remove deduplication event in logstash

I want to remove the duplicate event based on particular field of my input
i wrote logic like following but i got an error
aggregate {
task_id => "%{[meta][ingestionHash]}"
code => "
map['@metadata']['keep'] ||= event.get('[@metadata][first_event]') ? false : true
event.set('[@metadata][first_event]', true)
"
end_of_task => true
}

if ![[@metadata][keep]] {
drop { }
}

Welcome to the community

Usually, deduplication is done by the fingerprint plugin. Check the blog

can we filter out based on some field value.

In input i have a field name "ingestionHash" and based on this field i want to remove duplicate event. if two events having the same ingestionHash value then i want ignore the second event..
aggregate {
task_id => "%{[meta][ingestionHash]}"
code => "
map['@metadata']['keep'] ||= event.get('[@metadata][first_event]') ? false : true
event.set('[@metadata][first_event]', true)
"
end_of_task => true
}

if ![[@metadata][keep]] {
drop { }
}

this is my code for remove deduplicate event . is it correct ?

Why not use this field as the document_id in your elasticsearch output?

The deduplication needs to be done in Elasticsearch not Logstash.

I do not use the aggregation filter, so I can not tell if your code is right or not, but I do not think that even if it is right it will work as you want.

The aggregate filter has a timeout, a time range in which it will agregate the events, if you receive two events with the same ingestionHash value, but they come outside this time range, the aggregate filter will do nothing.

For example, if you receive an event with the ingestionHash value of abcd-1234 now and receive the same ingestionHash 15 minutes later, they will note be aggregate because the timeout for the aggregate filter will already be expired.

1 Like

instead of using document_id, is there any another way to remove deduplicate event based on some field value. because to create a document_id i have use fingerprint filter plugin.
following is my config file
input {
}
filter {
mutate{
split => ["topicParts", "."]
add_field => { "dataType" => "%{[topicParts][2]}" }
add_field => { "ingestKeyHash" => "" }
}
fingerprint {
source => ["[meta][recordKeys]"]
method => "MURMUR3"
target => "ingestKeyHash"
}
}

output {
opensearch {
hosts => ["${ELASTIC_SEARCH_URL}:443"]
auth_type => {
type => 'aws_iam'
aws_access_key_id => '${LOGSTASH_USER_ACCESS_KEY}'
aws_secret_access_key => '${LOGSTASH_USER_SECRET_KEY}'
region => 'us-west-2'
}
ecs_compatibility => disabled
index => "logstash-%{[@metadata][kafka][topic]}-%{+YYYY.MM}"
ssl_certificate_verification => true
document_id => "%{[@metadata][kafka][topic]}-%{[ingestKeyHash]}"
}
}

input data :-1:

{
"topicTimestamp": "2023-07-26T07:33:27.032Z",
"ingestKeyHash": 3171307072,
"collectionSource": "service.mds.ro",
"partition": "0",
"dataType": "avro",
"@version": "1",
"@timestamp": "2023-07-26T07:33:27.048Z",
"offset": "333",
"meta": {
"providerDealerId": "dealer1",
"dataType": null,
"ingestionHash": "421047889",
"providerName": "Drive DMS",
"subscriptionPartnerName": null,
"recordKeys": {
"roNum": "134825",
"providerDealerId": "3P12we",
"providerId": "a41a5ea7",
"roOpenDate": "2023-07-25"
}
}
}

Note -: ingestKeyHash and ingestionHash are two different fields.

OpenSearch/OpenDistro are AWS run products and differ from the original Elasticsearch and Kibana products that Elastic builds and maintains. You may need to contact them directly for further assistance.

(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns :elasticheart: )

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.