Logstash doc_as_upsert => true not updating few fields sometimes

Hi,

Have came across a strange issue where in if we are sending the doc_as_upsert => true for the data to be updated it doesn't update few fields at random.
For example if I have 1000 records which after insert are updating a field "record_status" from PENDING to SUCCESS, it doesn't update for 4-5 records which brings inconsistency and also concern regarding reliability of ELB stack that we planning to use.

Logstash Version: logstash-6.2.2
Elasticsearch Version: elasticsearch-6.2.2

Logstash Config:

input {

	  beats {
		port => 5044
	  }

}

filter {

    if "karaf_logs" in [tags] {

		grok{
			match => ['message', "%{TIMESTAMP_ISO8601:tm} \| %{LOGLEVEL:loglevel}  \| %{GREEDYDATA:queue_name} \| %{GREEDYDATA:route_name} \| %{GREEDYDATA:thread_info} \| %{GREEDYDATA:content}"] 
			break_on_match => false
		}


    } else if "k8s_logs" in [tags] {

		grok{
			match => ['message', "%{GREEDYDATA:raw_date} %{TIME:tm1} %{GREEDYDATA:hostname} journal: %{TIME:tm2} \[.+\] %{LOGLEVEL:loglevel}  %{GREEDYDATA:content}"]
			break_on_match => false
		}


    }



	grok{
		match => ["content", '(?<type>(?<=\[).*?(?=\]))' ]
		match => ["content", '(?<json_in>({.*$))' ]
		break_on_match => false
	}

	json {
		source => "json_in"
		remove_field => "json_in"
	}



    # --------- Setup ES ID Field (This can only be setup after json filter hence not included in previous check)


    if ([EXCHANGE_ID]) {
        mutate { add_field => { "es_id" => "%{EXCHANGE_ID}" } }
    } else {
        if [user_reference] {
            mutate { add_field => { "es_id" => "%{user_reference}" } }
        } else {
            mutate { add_field => { "es_id" => "%{exchange_id}" } }
        }
    }


    # ------- Remove unwanted fields

	mutate { 
        remove_field => "message"
        remove_field => "content"
        remove_field => "beat"
        remove_field => "raw_date"
        remove_field => "offset"
        remove_field => "prospector"
		remove_field => "tm1"
		remove_field => "tm2"
    }


}



output {

    stdout { codec => rubydebug }

	elasticsearch {
                        hosts => ["0.0.0.0:9200"]
			index => "logstash-data-monitor"
			action => "update"
			doc_as_upsert => true
			document_id => "%{[es_id]}"
	}

}

Sample data which updated correctly:

Insert

{
"host" => "k8s_v5_node1",
"loglevel" => "INFO",
"@timestamp" => 2018-05-25T07:17:47.629Z,
"type" => "KE-az-prc-mno-telco-572202",
"hostname" => "k8s_v5_node1",
"@version" => "1",
"user_reference" => "442E9608443C469B9C5BBD072182F224",
"source" => "/var/log/messages",
"tags" => [
[0] "k8s_logs",
[1] "beats_input_codec_plain_applied"
],
"exchange_start_time" => "Wed May 23 20:47:16 GMT 2018",
"es_id" => "442E9608443C469B9C5BBD072182F224",
"exchange_id" => "ID-az-prc-mno-2567439494-1vq68-36509-1527085265398-0-357",
"record_status" => "INITIATED"
}

Update:

{
"telco_bill_reference" => "0060034001",
"loglevel" => "INFO",
"transaction_amount" => "82127.00",
"telco_reference" => "XEN3KG0103",
"source" => "/var/log/messages",
"transaction_type" => "CBS_POSTING",
"host" => "k8s_v5_node1",
"credit_account" => "5029728001",
"customer_name" => "LOCAL MERCHANT 1",
"@timestamp" => 2018-05-25T07:17:47.629Z,
"telco_transaction_type" => "",
"type" => "KE-az-prc-mno-telco-572202",
"telco_short_code" => "572202",
"hostname" => "k8s_v5_node1",
"@version" => "1",
"user_reference" => "442E9608443C469B9C5BBD072182F224",
"cbs_reference" => "000ADDC181430526",
"tags" => [
[0] "k8s_logs",
[1] "beats_input_codec_plain_applied"
],
"debit_account" => "AS378601",
"es_id" => "442E9608443C469B9C5BBD072182F224",
"record_status" => "SUCCESS"
}

Sample Data which didn't updated correctly:

Insert:

{
"host" => "k8s_v5_node1",
"loglevel" => "INFO",
"@timestamp" => 2018-05-25T07:17:48.075Z,
"type" => "KE-az-prc-mno-telco-572202",
"hostname" => "k8s_v5_node1",
"@version" => "1",
"user_reference" => "F07C156EB57048A28645955EDB49E27A",
"source" => "/var/log/messages",
"tags" => [
[0] "k8s_logs",
[1] "beats_input_codec_plain_applied"
],
"exchange_start_time" => "Thu May 24 05:52:06 GMT 2018",
"es_id" => "F07C156EB57048A28645955EDB49E27A",
"exchange_id" => "ID-az-prc-mno-2567439494-1vq68-36509-1527085265398-0-722",
"record_status" => "INITIATED"
}

Update:

{
"telco_bill_reference" => "5029728001",
"loglevel" => "INFO",
"transaction_amount" => "235033.00",
"telco_reference" => "XEO8KI9009",
"source" => "/var/log/messages",
"transaction_type" => "CBS_POSTING",
"host" => "k8s_v5_node1",
"credit_account" => "5029728001",
"customer_name" => "MERCHANT 773",
"@timestamp" => 2018-05-25T07:17:48.076Z,
"telco_transaction_type" => "",
"type" => "KE-az-prc-mno-telco-572202",
"telco_short_code" => "572202",
"hostname" => "k8s_v5_node1",
"@version" => "1",
"user_reference" => "F07C156EB57048A28645955EDB49E27A",
"cbs_reference" => "000FTMP181440151",
"tags" => [
[0] "k8s_logs",
[1] "beats_input_codec_plain_applied"
],
"debit_account" => "AS378601",
"es_id" => "F07C156EB57048A28645955EDB49E27A",
"record_status" => "SUCCESS"
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.