Extremely high load for the ElasticSearch for 1.7.2 and 2.1

Recently I encountered a strange situation. I use ES for storing Nginx logs.

I use my own codes to pull logs from Kafka and push them to ES. The index speed is about 10K to 20K per seconds. It works very well for save 700 millions logs per day. Everyday I will create a new index for it. 700M logs will occupy 1.7TB space. I have 8TB space. So I will keep 4-5 days logs for analysis. This stable situation lasts more than 10 days.

But from one of the day in last week, after I added a new Date type field, and turn the URL field from 'not_analyzed' to a 'string' field. I saw the the cluster's bulk indexing speed going down dramatically occasionally.

I got two machines in this cluster. I notice that one of the machine's load will be occasionally high to 50.0, CPU going to more than 90%. But another one's cpu is going down to less than 10%. It appears alternatively in both machines. During the busy time, my ES client receives Timeout errors sometime.

I did some research for the busy time. The /_nodes/hot_threads API give me some infos.

91.9% (459.5ms out of 500ms) cpu usage by thread 'elasticsearch[nginx2][[logstash-kafka-11.29][17]: Lucene Merge Thread #67]'
 2/10 snapshots sharing following 12 elements
  org.apache.lucene.codecs.lucene50.Lucene50DocValuesConsumer.addNumericField(Lucene50DocValuesConsumer.java:80)

After that, I upgrade ES from 1.7.2 to 2.1.0. It continuously have the high load sometimes.
What the cause about it?
Is it the cause of turn URL field from not_analyzed to string? So the lucene will do much more job for merging index data?
How can I optimize the performance or make the performance more smooth?

Machines info: 16 core CPU, 16G Mem, SSD HD.
Index mapping: All the fields are not_analyzed except the URL field. Two date type fields.
Other settings: refresh_interval => 2m, num_of_shards => 20, num_of_replicas => 0, flush_threshold_ops =>5000

I did one more experiment.
I changed the URL field back to not_analyzed and remove the date field. So all the things back to the previous normal situation.
So I don't think the cause is the change of the mapping.

I found some other evidences that there maybe a bug exist.
I found the merge action is only happens on one of the machine, the merge action not distributed to other machines.

following node stats is the busy one.

  {
    "current": 14,
    "current_docs": 22386014,
    "current_size": "46.8gb",
    "current_size_in_bytes": 50308269402,
    "total": 244,
    "total_time": "11.1h",
    "total_time_in_millis": 40022410,
    "total_docs": 6511454,
    "total_size": "15.5gb",
    "total_size_in_bytes": 16714559156,
    "total_stopped_time": "0s",
    "total_stopped_time_in_millis": 0,
    "total_throttled_time": "23.9m",
    "total_throttled_time_in_millis": 1438730,
    "total_auto_throttle": "144.8mb",
    "total_auto_throttle_in_bytes": 151883659
  }

following is the relax one:

  {
    "current": 0,
    "current_docs": 0,
    "current_size": "0b",
    "current_size_in_bytes": 0,
    "total": 29048,
    "total_time": "58.1m",
    "total_time_in_millis": 3488507,
    "total_docs": 15009082,
    "total_size": "41.7gb",
    "total_size_in_bytes": 44829713758,
    "total_stopped_time": "0s",
    "total_stopped_time_in_millis": 0,
    "total_throttled_time": "25.1m",
    "total_throttled_time_in_millis": 1511631,
    "total_auto_throttle": "145.4mb",
    "total_auto_throttle_in_bytes": 152567606
  }

I found the busy one has take all the merge jobs by it self.

This means that your node is busy writing doc values for a numeric field to disk. Can you paste the entire stack trace so that we can look for potential bottlenecks?

Following is the busy one's hot_threads information. And the forum has a 5000 characters' limit, so I will post 3 threads in 3 replies. This is the hottest one.

87.1% (435.7ms out of 500ms) cpu usage by thread 'elasticsearch[nginx-2][[logstash-kafka-12.01][6]: Lucene Merge Thread #32]'
 4/10 snapshots sharing following 22 elements
   sun.nio.ch.FileDispatcherImpl.write0(Native Method)
   sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
   sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
   sun.nio.ch.IOUtil.write(IOUtil.java:65)
   sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:210)
   java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
   java.nio.channels.Channels.writeFully(Channels.java:101)
   java.nio.channels.Channels.access$000(Channels.java:61)
   java.nio.channels.Channels$1.write(Channels.java:174)
   org.apache.lucene.store.FSDirectory$FSIndexOutput$1.write(FSDirectory.java:271)
   java.util.zip.CheckedOutputStream.write(CheckedOutputStream.java:73)
   java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
   org.apache.lucene.store.OutputStreamIndexOutput.writeBytes(OutputStreamIndexOutput.java:53)
   org.apache.lucene.store.RateLimitedIndexOutput.writeBytes(RateLimitedIndexOutput.java:73)
   org.apache.lucene.store.DataOutput.copyBytes(DataOutput.java:278)
   org.apache.lucene.codecs.lucene50.Lucene50CompoundFormat.write(Lucene50CompoundFormat.java:91)
   org.apache.lucene.index.IndexWriter.createCompoundFile(IndexWriter.java:4680)
   org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4152)
   org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:3664)
   org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:588)
   org.elasticsearch.index.engine.ElasticsearchConcurrentMergeScheduler.doMerge(ElasticsearchConcurrentMergeScheduler.java:94)
   org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:626)
 6/10 snapshots sharing following 14 elements
   java.lang.Object.wait(Native Method)
   java.lang.Object.wait(Object.java:460)
   org.apache.lucene.index.MergeRateLimiter.maybePause(MergeRateLimiter.java:163)
   org.apache.lucene.index.MergeRateLimiter.pause(MergeRateLimiter.java:94)
   org.apache.lucene.store.RateLimitedIndexOutput.checkRate(RateLimitedIndexOutput.java:78)
   org.apache.lucene.store.RateLimitedIndexOutput.writeBytes(RateLimitedIndexOutput.java:72)
   org.apache.lucene.store.DataOutput.copyBytes(DataOutput.java:278)
   org.apache.lucene.codecs.lucene50.Lucene50CompoundFormat.write(Lucene50CompoundFormat.java:91)
   org.apache.lucene.index.IndexWriter.createCompoundFile(IndexWriter.java:4680)
   org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4152)
   org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:3664)
   org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:588)
   org.elasticsearch.index.engine.ElasticsearchConcurrentMergeScheduler.doMerge(ElasticsearchConcurrentMergeScheduler.java:94)
   org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:626)

This is the second one.

83.4% (416.7ms out of 500ms) cpu usage by thread 'elasticsearch[nginx-2][bulk][T#12]'
 2/10 snapshots sharing following 17 elements
   org.apache.lucene.index.DocumentsWriter.updateDocument(DocumentsWriter.java:436)
   org.apache.lucene.index.IndexWriter.updateDocument(IndexWriter.java:1475)
   org.apache.lucene.index.IndexWriter.addDocument(IndexWriter.java:1254)
   org.elasticsearch.index.engine.InternalEngine.innerCreateNoLock(InternalEngine.java:448)
   org.elasticsearch.index.engine.InternalEngine.innerCreate(InternalEngine.java:390)
   org.elasticsearch.index.engine.InternalEngine.create(InternalEngine.java:362)
   org.elasticsearch.index.shard.IndexShard.create(IndexShard.java:531)
   org.elasticsearch.index.engine.Engine$Create.execute(Engine.java:810)
   org.elasticsearch.action.support.replication.TransportReplicationAction.executeIndexRequestOnPrimary(TransportReplicationAction.java:1073)
   org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:338)
   org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:131)
   org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase.performOnPrimary(TransportReplicationAction.java:579)
   org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase$1.doRun(TransportReplicationAction.java:452)
   org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   java.lang.Thread.run(Thread.java:745)
 4/10 snapshots sharing following 15 elements
   org.apache.lucene.index.IndexWriter.addDocument(IndexWriter.java:1254)
   org.elasticsearch.index.engine.InternalEngine.innerCreateNoLock(InternalEngine.java:448)
   org.elasticsearch.index.engine.InternalEngine.innerCreate(InternalEngine.java:390)
   org.elasticsearch.index.engine.InternalEngine.create(InternalEngine.java:362)
   org.elasticsearch.index.shard.IndexShard.create(IndexShard.java:531)
   org.elasticsearch.index.engine.Engine$Create.execute(Engine.java:810)
   org.elasticsearch.action.support.replication.TransportReplicationAction.executeIndexRequestOnPrimary(TransportReplicationAction.java:1073)
   org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:338)
   org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:131)
   org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase.performOnPrimary(TransportReplicationAction.java:579)
   org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase$1.doRun(TransportReplicationAction.java:452)
   org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   java.lang.Thread.run(Thread.java:745)
 4/10 snapshots sharing following 13 elements
   org.elasticsearch.index.engine.InternalEngine.innerCreate(InternalEngine.java:377)
   org.elasticsearch.index.engine.InternalEngine.create(InternalEngine.java:362)
   org.elasticsearch.index.shard.IndexShard.create(IndexShard.java:531)
   org.elasticsearch.index.engine.Engine$Create.execute(Engine.java:810)
   org.elasticsearch.action.support.replication.TransportReplicationAction.executeIndexRequestOnPrimary(TransportReplicationAction.java:1073)
   org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:338)
   org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:131)
   org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase.performOnPrimary(TransportReplicationAction.java:579)
   org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase$1.doRun(TransportReplicationAction.java:452)
   org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   java.lang.Thread.run(Thread.java:745)

The third one.

  69.2% (346.1ms out of 500ms) cpu usage by thread 'elasticsearch[nginx-2][[logstash-kafka-12.01][10]: Lucene Merge Thread #35]'
 2/10 snapshots sharing following 12 elements
   org.apache.lucene.codecs.lucene50.Lucene50DocValuesConsumer.addSortedField(Lucene50DocValuesConsumer.java:459)
   org.apache.lucene.codecs.lucene50.Lucene50DocValuesConsumer.addSortedSetField(Lucene50DocValuesConsumer.java:499)
   org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat$FieldsWriter.addSortedSetField(PerFieldDocValuesFormat.java:131)
   org.apache.lucene.codecs.DocValuesConsumer.mergeSortedSetField(DocValuesConsumer.java:736)
   org.apache.lucene.codecs.DocValuesConsumer.merge(DocValuesConsumer.java:219)
   org.apache.lucene.index.SegmentMerger.mergeDocValues(SegmentMerger.java:150)
   org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:105)
   org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4089)
   org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:3664)
   org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:588)
   org.elasticsearch.index.engine.ElasticsearchConcurrentMergeScheduler.doMerge(ElasticsearchConcurrentMergeScheduler.java:94)
   org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:626)
 2/10 snapshots sharing following 28 elements
   sun.nio.ch.NativeThread.current(Native Method)
   sun.nio.ch.NativeThreadSet.add(NativeThreadSet.java:46)
   sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:704)
   sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:694)
   org.apache.lucene.store.NIOFSDirectory$NIOFSIndexInput.readInternal(NIOFSDirectory.java:180)
   org.apache.lucene.store.BufferedIndexInput.refill(BufferedIndexInput.java:342)
   org.apache.lucene.store.BufferedIndexInput.readShort(BufferedIndexInput.java:283)
   org.apache.lucene.util.packed.DirectReader$DirectPackedReader12.get(DirectReader.java:148)
   org.apache.lucene.codecs.lucene50.Lucene50DocValuesProducer$2.get(Lucene50DocValuesProducer.java:464)
   org.apache.lucene.util.LongValues.get(LongValues.java:45)
   org.apache.lucene.codecs.lucene50.Lucene50DocValuesProducer$7.getOrd(Lucene50DocValuesProducer.java:624)
   org.apache.lucene.index.SingletonSortedSetDocValues.setDocument(SingletonSortedSetDocValues.java:53)
   org.apache.lucene.codecs.DocValuesConsumer$8$1.setNext(DocValuesConsumer.java:820)
   org.apache.lucene.codecs.DocValuesConsumer$8$1.hasNext(DocValuesConsumer.java:782)
   org.apache.lucene.codecs.DocValuesConsumer$10$1.hasNext(DocValuesConsumer.java:965)
   org.apache.lucene.codecs.lucene50.Lucene50DocValuesConsumer.addNumericField(Lucene50DocValuesConsumer.java:133)
   org.apache.lucene.codecs.lucene50.Lucene50DocValuesConsumer.addSortedField(Lucene50DocValuesConsumer.java:460)
   org.apache.lucene.codecs.lucene50.Lucene50DocValuesConsumer.addSortedSetField(Lucene50DocValuesConsumer.java:499)
   org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat$FieldsWriter.addSortedSetField(PerFieldDocValuesFormat.java:131)
   org.apache.lucene.codecs.DocValuesConsumer.mergeSortedSetField(DocValuesConsumer.java:736)
   org.apache.lucene.codecs.DocValuesConsumer.merge(DocValuesConsumer.java:219)
   org.apache.lucene.index.SegmentMerger.mergeDocValues(SegmentMerger.java:150)
   org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:105)
   org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4089)
   org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:3664)
   org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:588)
   org.elasticsearch.index.engine.ElasticsearchConcurrentMergeScheduler.doMerge(ElasticsearchConcurrentMergeScheduler.java:94)
   org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:626)

I encountered this situation again. When a node left the cluster and re-join again, all the load are turn to this node. Other nodes has very little load. And this situation seems last forever.

I tried to relocate some of shards to other nodes, the load could magically balance again.

Following is the log when the node rejoin the cluster.

[2015-12-03 20:25:41,566][DEBUG][action.admin.indices.stats] [nginx-c3] [indices:monitor/stats] failed to execute operation for shard [[logstash-k
     afka-12.03][2], node[_SdsxW3FSnmqhe2kiPtqHA], [P], v[3], s[INITIALIZING], a[id=3xbnSF4vQE-ZHFRlqy5N-Q], unassigned_info[[reason=NODE_LEFT], at[201
     5-12-03T12:20:01.049Z], details[node_left[_SdsxW3FSnmqhe2kiPtqHA]]]]
1843 [logstash-kafka-12.03][[logstash-kafka-12.03][2]] BroadcastShardOperationFailedException[operation indices:monitor/stats failed]; nested: IllegalI
     ndexShardStateException[CurrentState[RECOVERING] operations only allowed when shard state is one of [POST_RECOVERY, STARTED, RELOCATED]];
1844         at org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction$BroadcastByNodeTransportRequestHandler.onShardOperation(
     TransportBroadcastByNodeAction.java:405)
1845         at org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction$BroadcastByNodeTransportRequestHandler.messageReceived(T
     ransportBroadcastByNodeAction.java:382)
1846         at org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction$BroadcastByNodeTransportRequestHandler.messageReceived(T
     ransportBroadcastByNodeAction.java:371)
1847         at org.elasticsearch.transport.netty.MessageChannelHandler$RequestHandler.doRun(MessageChannelHandler.java:299)
1848         at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
1849         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
1850         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
1851         at java.lang.Thread.run(Thread.java:745)
1852 Caused by: [logstash-kafka-12.03][[logstash-kafka-12.03][2]] IllegalIndexShardStateException[CurrentState[RECOVERING] operations only allowed when
      shard state is one of [POST_RECOVERY, STARTED, RELOCATED]]
1853         at org.elasticsearch.index.shard.IndexShard.readAllowed(IndexShard.java:974)
1854         at org.elasticsearch.index.shard.IndexShard.acquireSearcher(IndexShard.java:808)
1855         at org.elasticsearch.index.shard.IndexShard.docStats(IndexShard.java:628)
1856         at org.elasticsearch.action.admin.indices.stats.CommonStats.<init>(CommonStats.java:131)
1857         at org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction.shardOperation(TransportIndicesStatsAction.java:165)
1858         at org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction.shardOperation(TransportIndicesStatsAction.java:47)
1859         at org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction$BroadcastByNodeTransportRequestHandler.onShardOperation(
     TransportBroadcastByNodeAction.java:401)
1860         ... 7 more