Found duplicate records in elasticsearch

Hello,

We have setup logs pipeline as below
filebeat -> kafka -> logstash -> elasticsearch

  1. filebeat is running on many servers which will send to centralized kafka server one topic and 20 partitions.
  2. We have 2 logstash servers with 10 threads to read the events from kafka. Both logstash servers form a same group (group name is 'logstash').
  3. logstsh will write to elasticsearch servers.

When i search the logs in Kibana, we see duplicate logs. Few events are coming 2 times, 3 times, 5 times and it is random. Let us know how to fix the isse.

Logstash Input Plugin config

input {
   kafka {
     bootstrap_servers => "kafka-server:9092"
     group_id => "logstash"
     topics => ["filebeat","jmeter"]
     codec => "json"
     consumer_threads => 10
   }
}

Logstash output config

output {

        if [fields][log_type] == "jtl" or [fields][log_type] == "jmeter.log" {
                elasticsearch {
                        hosts => ["server1", "server2", "server3",  "server4"]
                        manage_template => false
                        index => "jmeter-%{+YYYY.ww}"
                        document_type => "log"
                        #document_id => "%{[@metadata][fingerprint]}"
                }
        } else {
                elasticsearch {
                        hosts => ["server1", "server2", "server3",  "server4"]                        
                        manage_template => false
                        index => "filebeat-%{+YYYY.MM.dd}"
                        document_type => "log"
                        #document_id => "%{[@metadata][fingerprint]}"
                }
        }


}

Log filtered from Kibana

	Time 	_id 	offset  	message  	beat.hostname  	source  
November 24th 2017, 22:27:01.660	AV_vJW6qCJvg6CObUTjK	1,183,688[37mDEBU[0m[29692031] Calling GET /version                         	appserver1	/HOST/var/log/upstart/docker.log
November 24th 2017, 22:27:01.660	1950307853	1,183,759[37mDEBU[0m[29692033] Calling GET /info                            	appserver1	/HOST/var/log/upstart/docker.log
November 24th 2017, 22:27:01.660	AV_vKOUJCJvg6CObVNT7	1,183,759[37mDEBU[0m[29692033] Calling GET /info                            	appserver1	/HOST/var/log/upstart/docker.log
November 24th 2017, 22:27:01.660	AV_vK54rCJvg6CObV5pW	1,183,759[37mDEBU[0m[29692033] Calling GET /info                            	appserver1	/HOST/var/log/upstart/docker.log
November 24th 2017, 22:27:01.660	AV_vJWG2cd_DiBBbhmf_	1,183,759[37mDEBU[0m[29692033] Calling GET /info                            	appserver1	/HOST/var/log/upstart/docker.log

What is the load on the Elasticsearch cluster? Which version are you using? What is the specification of your Elasticsearch cluster?

Logstash is designed to retry indexing documents until it has received confirmation it has been indexed correctly. This allows it to offer an at-least-once delivery guarantee as long as persistent queues are configured. If the Elasticsearch cluster is under heavy load so that requests time out, Logstash could end up retrying while the data is still being processed.

Hi Christian,

We have the following configuration
Elasticsearch:

  1. We are using Elasticsearch version: 5.2 No of nodes : 4 (Each node is on a dedicated server which is of 32 GB/8 Core configuration).
  2. Heap Size of each node: 16 GB
  3. OS: Ubutu 14.04
  4. We have configured for 1 replica shard.
  5. Average load on the elasticsearch cluster: 100 events/sec. The data log data.

I tried using fingerprint for generating id for an event in logstash. This had slowed down indexing drastically.

       fingerprint {
               source => "message"
               target => "[@metadata][fingerprint]"
               method => "MURMUR3"
        }

Thanks
Phaniraj

Is there anything in the logs indicating that Elasticsearch has been having problems? What is the full output of the cluster stats API?

When selecting a hash identifier, you can improve performance by selecting one that is fast and Lucene friendly. The MURMUR3 hash is fast, but only generates a 32-bit hash value, which may lead to hash collisions unless data volumes are low. To get around this you could create a document id that starts with the event timestamp and is followed by the hash. This should make it Lucene friendly and reduce the risk of hash collisions and the hash only meeds to separate events with the same timestamp.

Hi Christian,

Here is the output of cluster stats

{
	"_nodes": {
		"total": 4,
		"successful": 4,
		"failed": 0
	},
	"cluster_name": "elasticsearch",
	"timestamp": 1511769464003,
	"status": "green",
	"indices": {
		"count": 37,
		"shards": {
			"total": 362,
			"primaries": 181,
			"replication": 1.0,
			"index": {
				"shards": {
					"min": 2,
					"max": 10,
					"avg": 9.783783783783784
				},
				"primaries": {
					"min": 1,
					"max": 5,
					"avg": 4.891891891891892
				},
				"replication": {
					"min": 1.0,
					"max": 1.0,
					"avg": 1.0
				}
			}
		},
		"docs": {
			"count": 110860826,
			"deleted": 1206145
		},
		"store": {
			"size_in_bytes": 132050125018,
			"throttle_time_in_millis": 0
		},
		"fielddata": {
			"memory_size_in_bytes": 40600,
			"evictions": 0
		},
		"query_cache": {
			"memory_size_in_bytes": 9809064,
			"total_count": 25503,
			"hit_count": 573,
			"miss_count": 24930,
			"cache_size": 209,
			"cache_count": 428,
			"evictions": 219
		},
		"completion": {
			"size_in_bytes": 0
		},
		"segments": {
			"count": 2804,
			"memory_in_bytes": 208212154,
			"terms_memory_in_bytes": 141231788,
			"stored_fields_memory_in_bytes": 52845736,
			"term_vectors_memory_in_bytes": 0,
			"norms_memory_in_bytes": 2263488,
			"points_memory_in_bytes": 7519750,
			"doc_values_memory_in_bytes": 4351392,
			"index_writer_memory_in_bytes": 71333616,
			"version_map_memory_in_bytes": 698517,
			"fixed_bit_set_memory_in_bytes": 0,
			"max_unsafe_auto_id_timestamp": 1511654475482,
			"file_sizes": {}
		}
	},
	"nodes": {
		"count": {
			"total": 4,
			"data": 4,
			"coordinating_only": 0,
			"master": 3,
			"ingest": 4
		},
		"versions": ["5.2.0"],
		"os": {
			"available_processors": 64,
			"allocated_processors": 64,
			"names": [{
				"name": "Linux",
				"count": 4
			}],
			"mem": {
				"total_in_bytes": 267042099200,
				"free_in_bytes": 5925228544,
				"used_in_bytes": 261116870656,
				"free_percent": 2,
				"used_percent": 98
			}
		},
		"process": {
			"cpu": {
				"percent": 44
			},
			"open_file_descriptors": {
				"min": 672,
				"max": 696,
				"avg": 684
			}
		},
		"jvm": {
			"max_uptime_in_millis": 326948381,
			"versions": [{
				"version": "1.8.0_144",
				"vm_name": "Java HotSpot(TM) 64-Bit Server VM",
				"vm_version": "25.144-b01",
				"vm_vendor": "Oracle Corporation",
				"count": 1
			}, {
				"version": "1.8.0_45",
				"vm_name": "Java HotSpot(TM) 64-Bit Server VM",
				"vm_version": "25.45-b02",
				"vm_vendor": "Oracle Corporation",
				"count": 3
			}],
			"mem": {
				"heap_used_in_bytes": 15049318920,
				"heap_max_in_bytes": 55381065728
			},
			"threads": 795
		},
		"fs": {
			"total_in_bytes": 12155158941696,
			"free_in_bytes": 11998286778368,
			"available_in_bytes": 11501763821568,
			"spins": "true"
		},
		"plugins": [],
		"network_types": {
			"transport_types": {
				"netty4": 4
			},
			"http_types": {
				"netty4": 4
			}
		}
	}
}

It looks like one of the nodes is running with newer JVM than the others. It also looks like all nodes may not have 16GB heap configured. If this is correct, could it perhaps be causing increased heap pressure on one of the nodes? Do you have monitoring in pace so you can look at heap usage and GC across the nodes?

Hi Christian,

We have fixed the JVM version mismatch in one of the node. We have also made sure that each node is allocated with 16 GB of heap space.

We have setup monitoring using telegraf. Will observe.

Thanks
Phaniraj

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.