Logstash kafka input to elasticsearch output stops consuming

Logstash 2.2
elasticsearch 1.7.4

I am seeing and issue where I am consuming from kafka and sending to elasticsearch. It consumes about 500 messages at start and then stops with this messages being output to logs

{:timestamp=>"2016-02-26T17:16:33.977000+0000", :message=>"Flushing buffer at interval", :instance=>"#<LogStash::Outputs::ElasticSearch::Buffer:0x3c356a00 @stopping=#<Concurrent::AtomicBoolean:0x56c244e1>, @last_flush=2016-02-26 17:16:32 +0000, @flush_thread=#<Thread:0x6c383c46 run>, @max_size=500, @operations_lock=#<Java::JavaUtilConcurrentLocks::ReentrantLock:0x5ffd4f2b>, @submit_proc=#<Proc:0x2bf99714@/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.5.1-java/lib/logstash/outputs/elasticsearch/common.rb:57>, @flush_interval=1, @logger=#<Cabin::Channel:0x2e4d6053 @subscriber_lock=#<Mutex:0x59ef00e4>, @data={}, @metrics=#<Cabin::Metrics:0x5a23a6dc @channel=#<Cabin::Channel:0x2e4d6053 ...>, @metrics={}, @metrics_lock=#<Mutex:0x5a9fe3bf>>, @subscribers={13010=>#<Cabin::Outputs::IO:0x708f56ae @lock=#<Mutex:0x5283f33d>, @io=#<File:/mnt/log/logstash/logstash.log>>}, @level=:debug>, @buffer=[], @operations_mutex=#<Mutex:0x2b28e3f>>", :interval=>1, :level=>:debug, :file=>"logstash/outputs/elasticsearch/buffer.rb", :line=>"90", :method=>"interval_flush"}

If i output to stdout or file , I am not seeing this issue.

Any errors on the Elasticsearch side?

no, all clean on es side

I have 10 node cluster with this as the only input, so there is no contention or load issues

Are you able to try 2.1.X? I wonder if there is a threading/pipeline issue. Also configs are helpful for reference.

not able to upgrade es yet, but doesn't seem like it's getting overwhelmed. the fact that it sends the first batch and then just stops with no error message seems like the connection between the input and output is borked.

I have the flush_size artificially low to slow things down

input {
        kafka {
                topic_id => "logs"
                group_id => "log_consumers"
                zk_connect => "zoo01:2181,zoo02:2181,zoo03:2181,zoo04:2181,zoo05:2181/"
        }
}

filter {
        mutate {
                add_tag => ["kafka"]
        }
}

output {
  elasticsearch {
      hosts => [ "es01:9200" ]
      sniffing => true
      flush_size => 20
      idle_flush_time => 15
  }
}

So a bit more info, I downgraded to 1.5.5 logstash and it seems to be tied to the flush_size. I set the flush_size to 3000 and that's exactly how many messages get processed. I can see with the log in debug mode that messages are still being consumed from kafka, but they aren't making it to es. I also confirmed the same behavior with logstash 2.2 except that it is capped to 500, so if your flush_size is below 500 it will process until 500, if it's more that 500 it stop at 500

before

after

I was searching the forums for 'interval_flush' and it may get tripped up with networking connectivity. Could you describe the network/environment topology you are in? It could be that Logstash thinks the network connection is open but it was actually closed by a firewall which could lead to Kafka consuming messages and the output sending messages but tem not actually reaching your ES cluster.

Could you try restarting the Logstash server? Perhaps the ES cluster if needed. Also I meant trying Logstash version 2.1.X but 1.5.5 was sufficient for that. Sorry I wasn't more specific.

I don't think that would be it, as it's very reproducible. I have fallen back to writing to elasticsearch while using the tcp/udp inputs. I haven't had time to dig into the code, but I think it's the interaction of and input and output that are both batch, because when i use what is essentially a stream input (tcp/udp/syslog) sending to elasticsearch or a kafka input and a stream output (file/stdout) things work as expected

do you see any errors on the kafka broker or zookeeper instances? What size are the messages? Any idea what the maximum message size is?

Sorry for all the questions bit this is puzzling. Could you share a payload of messages so that I may attempt to reproduce?

Hello, I have a similar problem which results to elasticsearch output stop consuming.

Specifically, I have 2x(kafka & logstash instances) => 3x ES instances.

Kafka and logstash are in the same node. I use kafka 0.9.0.1, logstash 2.2.1 and elasticsearch 2.2.0. (I have tried to put the zookeeper to the same node and to different node as well)

Logstash 2.2.2 kept giving some errors [1] and [2], so I switched to 2.2.1.

Nevertheless, there are some issues with kafka and logstash every now and then which give this warning in logstash output [3] and it continues after some time. Also, while I was trying something with geoip filter I got the same error with logstash 2.2.2 [1].

I have looked it up on the internet but still I can't figure out what the problem is. Any input would be really appreciated!

Thank you in advance.

[1] ERROR: org.I0Itec.zkclient.ZkEventThread: Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@8dc07d2]
kafka.common.ConsumerRebalanceFailedException: logstash_broker1 can't rebalance after 4 retries
...

[2] ERROR Error when sending message to topic connect-test with key: null, value: 128 bytes with error: Batch Expired (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

[3] WARN: kafka.client.ClientUtils$: Fetching topic metadata with correlation id 21 for topics [Set(connect-test)] from broker $id:0,host:esuser02,port:9092] failed
java.nio.channels.ClosedByInterruptException
_ at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)_
_ at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:511)_
.....