Here is my logstash config file .
In the output plugin I have added upsert script it compare the ingestionHash value with old documentation ingestionHash value.
- If the document_id doesn't exist (new document ie 1st time we send a data to Elasticsearch), it will be created new document whatever we send
data from output plugin. - If the document_id exists and the particular field of this document value is the same, it won't be updated the document ie document will remains as it is.
- If the document with document_id exists and the field value is different, the entire document will be updated with the new field value.
but it not worked according to my requirement .. can anyone help me to resolve this issue.
input {
kafka {
}
}
filter {
mutate{
split => ["topicParts", "."]
add_field => { "dataType" => "%{[topicParts][2]}" }
add_field => { "ingestKeyHash" => "" }
}
fingerprint {
source => ["[meta][recordKeys]"]
method => "MURMUR3"
target => "ingestKeyHash"
}
}
output {
opensearch {
hosts => ["${ELASTIC_SEARCH_URL}:443"]
auth_type => {
type => 'aws_iam'
aws_access_key_id => '${LOGSTASH_USER_ACCESS_KEY}'
aws_secret_access_key => '${LOGSTASH_USER_SECRET_KEY}'
region => 'us-west-2'
}
ecs_compatibility => disabled
index => "edp-logstash-%{[@metadata][kafka][topic]}-%{+YYYY.MM}"
ssl_certificate_verification => true
document_id => "%{[@metadata][kafka][topic]}-%{[ingestKeyHash]}"
upsert => '
{
"script" : {
"source": "
if (ctx._source.meta.ingestionHash == params.ingestionHash) {
ctx.op = "none";
} else {
ctx.op = "update";
}
",
"lang": "painless",
"params": {
"ingestionHash": "%{[meta][ingestionHash]}"
}
}
}
'
doc_as_upsert => true
}
}
input data/ json looks like
{
"topicTimestamp": "2023-07-26T07:33:27.032Z",
"ingestKeyHash": 3171307072,
"collectionSource": "service.mds.ro",
"partition": "0",
"dataType": "avro",
"@version": "1",
"@timestamp": "2023-07-26T07:33:27.048Z",
"offset": "333",
"meta": {
"providerDealerId": "dealer1",
"dataType": null,
"ingestionHash": "421047889",
"providerName": "Drive DMS",
"subscriptionPartnerName": null,
"recordKeys": {
"roNum": "134825",
"providerDealerId": "3P12we",
"providerId": "a41a5ea7-d3a8-4371",
"roOpenDate": "2023-07-25"
}
}
}