How the upsert script will work in elastci search

Here is my logstash config file .

In the output plugin I have added upsert script it compare the ingestionHash value with old documentation ingestionHash value.

  1. If the document_id doesn't exist (new document ie 1st time we send a data to Elasticsearch), it will be created new document whatever we send
    data from output plugin.
  2. If the document_id exists and the particular field of this document value is the same, it won't be updated the document ie document will remains as it is.
  3. If the document with document_id exists and the field value is different, the entire document will be updated with the new field value.

but it not worked according to my requirement .. can anyone help me to resolve this issue.

input {
    kafka {
           
    }
}
filter {
  
  mutate{
  split => ["topicParts", "."]
  add_field => { "dataType" => "%{[topicParts][2]}" }
  add_field => { "ingestKeyHash" => "" }
  }
  fingerprint {
  source => ["[meta][recordKeys]"]
  method => "MURMUR3"
  target => "ingestKeyHash"
  }
}


output {
  opensearch {
    hosts       => ["${ELASTIC_SEARCH_URL}:443"]
    auth_type => {
              type => 'aws_iam'
              aws_access_key_id => '${LOGSTASH_USER_ACCESS_KEY}'
              aws_secret_access_key => '${LOGSTASH_USER_SECRET_KEY}'
              region => 'us-west-2'
          }
    ecs_compatibility => disabled
    index       => "edp-logstash-%{[@metadata][kafka][topic]}-%{+YYYY.MM}"
    ssl_certificate_verification => true
    document_id => "%{[@metadata][kafka][topic]}-%{[ingestKeyHash]}"

    upsert => '
        {
            "script" : {
                "source": "
                    if (ctx._source.meta.ingestionHash == params.ingestionHash) {
                        ctx.op = "none";
                    } else {
                        ctx.op = "update";
                    }
                ",
                "lang": "painless",
                "params": {
                    "ingestionHash": "%{[meta][ingestionHash]}"
                }
            }
        }
    '

    doc_as_upsert => true
  }
}

input data/ json looks like

{
  "topicTimestamp": "2023-07-26T07:33:27.032Z",
  "ingestKeyHash": 3171307072,
  "collectionSource": "service.mds.ro",
  "partition": "0",
  "dataType": "avro",
  "@version": "1",
  "@timestamp": "2023-07-26T07:33:27.048Z",
  "offset": "333",
  "meta": {
    "providerDealerId": "dealer1",
    "dataType": null,
    "ingestionHash": "421047889",
    "providerName": "Drive DMS",
    "subscriptionPartnerName": null,
    "recordKeys": {
      "roNum": "134825",
      "providerDealerId": "3P12we",
      "providerId": "a41a5ea7-d3a8-4371",
      "roOpenDate": "2023-07-25"
    }
  }
}

OpenSearch/OpenDistro are AWS run products and differ from the original Elasticsearch and Kibana products that Elastic builds and maintains. You may need to contact them directly for further assistance.

(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns :elasticheart: )

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.