Logstash keeps crashing

Hello,

Every day, logstash is crashing and generating dozens of errors per second in the logs concerning Kafka.
I run ELK 5.6.4, Java 1.8.0_151, Kafka 0.10.0.1.
I get 2 logstash servers for indexing linked with Kafka (Server A @ IP 1.2.3.4 & Server B @ IP 1.2.3.5).

I copy below the log when logstash is starting but the most important are these lines :
09:45:41 MY-SERVER logstash: [2018-01-04T09:45:41,934][ERROR][logstash.filters.ruby ] Ruby exception occurred: undefined method `get' for #<Hash:0x15ad3840>
Jan 4 09:45:41 MY-SERVER logstash: [2018-01-04T09:45:41,935][ERROR][logstash.filters.ruby ] Ruby exception occurred: undefined method `get' for #<Hash:0x387222fc>
Jan 4 09:45:56 MY-SERVER logstash: [2018-01-04T09:45:56,837][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>1, :failures=>1, :sleep=>0.01}

At the end of the day, logstash crashes and the service is down.

The conf file is the following :
output{
kafka{
bootstrap_servers => "1.2.3.4:9092,1.2.3.5:9092" ##(IP MANUALLY CHANGED but OK)
topic_id => "salsa"
codec => json{}
}
}

Do you have any idea of this issue ?

Many thanks for your help.

[...]
Jan  4 09:45:10 MY-SERVER logstash: SLF4J: Found binding in [jar:file:/opt/logstash-receiver/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.11/vendor/jar-dependencies/runtime-jars/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
Jan  4 09:45:10 MY-SERVER logstash: SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
Jan  4 09:45:10 MY-SERVER logstash: SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Jan  4 09:45:10 MY-SERVER logstash: [2018-01-04T09:45:10,745][INFO ][org.apache.kafka.clients.producer.ProducerConfig] ProducerConfig values:
Jan  4 09:45:10 MY-SERVER logstash: bootstrap.servers = [1.2.3.4:9092, 1.2.3.5:9092]  ##IP CHANGED MANUALLY IN MY LOG
Jan  4 09:45:10 MY-SERVER logstash: [2018-01-04T09:45:10,824][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 0.10.0.1
Jan  4 09:45:10 MY-SERVER logstash: [2018-01-04T09:45:10,824][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : a7a17cdec9eaa6c5
Jan  4 09:45:10 MY-SERVER logstash: [2018-01-04T09:45:10,841][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>250}
Jan  4 09:45:11 MY-SERVER logstash: [2018-01-04T09:45:11,225][INFO ][logstash.inputs.beats    ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
Jan  4 09:45:11 MY-SERVER logstash: [2018-01-04T09:45:11,365][INFO ][logstash.inputs.lumberjack] Starting lumberjack input listener {:address=>"0.0.0.0:5555"}
Jan  4 09:45:11 MY-SERVER logstash: [2018-01-04T09:45:11,804][INFO ][org.logstash.beats.Server] Starting server on port: 5044
Jan  4 09:45:11 MY-SERVER logstash: [2018-01-04T09:45:11,844][INFO ][logstash.pipeline        ] Pipeline main started
Jan  4 09:45:11 MY-SERVER logstash: [2018-01-04T09:45:11,879][INFO ][logstash.inputs.syslog   ] Starting syslog udp listener {:address=>"0.0.0.0:10514"}
Jan  4 09:45:11 MY-SERVER logstash: [2018-01-04T09:45:11,899][INFO ][logstash.inputs.syslog   ] Starting syslog tcp listener {:address=>"0.0.0.0:10514"}
Jan  4 09:45:11 MY-SERVER logstash: [2018-01-04T09:45:11,909][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9601}
Jan  4 09:45:41 MY-SERVER logstash: [2018-01-04T09:45:41,934][ERROR][logstash.filters.ruby    ] Ruby exception occurred: undefined method `get' for #<Hash:0x15ad3840>
Jan  4 09:45:41 MY-SERVER logstash: [2018-01-04T09:45:41,935][ERROR][logstash.filters.ruby    ] Ruby exception occurred: undefined method `get' for #<Hash:0x387222fc>
Jan  4 09:45:56 MY-SERVER logstash: [2018-01-04T09:45:56,837][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>1, :failures=>1, :sleep=>0.01}
Jan  4 09:45:56 MY-SERVER logstash: [2018-01-04T09:45:56,868][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>1, :failures=>1, :sleep=>0.01}
Jan  4 09:45:56 MY-SERVER logstash: [2018-01-04T09:45:56,881][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>1, :failures=>1, :sleep=>0.01}
[....]

Jan  4 18:09:08 MY-SERVER logstash: [2018-01-08T18:09:08,115][WARN ][io.netty.channel.DefaultChannelPipeline] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
Jan  4 18:09:08 MY-SERVER logstash: java.io.IOException: Too many open files
Jan  4 18:09:08 MY-SERVER logstash: at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) ~[?:1.8.0_151]
Jan  4 18:09:08 MY-SERVER logstash: at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) ~[?:1.8.0_151]
Jan  4 18:09:08 MY-SERVER logstash: at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) ~[?:1.8.0_151]
Jan  4 18:09:08 MY-SERVER logstash: at io.netty.channel.socket.nio.NioServerSocketChannel.doReadMessages(NioServerSocketChannel.java:135) ~[netty-all-4.1.3.Final.jar:4.1.3.Final]
Jan  4 18:09:08 MY-SERVER logstash: at io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe.read(AbstractNioMessageChannel.java:75) [netty-all-4.1.3.Final.jar:4.1.3.Final]
Jan  4 18:09:08 MY-SERVER logstash: at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571) [netty-all-4.1.3.Final.jar:4.1.3.Final]
Jan  4 18:09:08 MY-SERVER logstash: at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512) [netty-all-4.1.3.Final.jar:4.1.3.Final]
Jan  4 18:09:08 MY-SERVER logstash: at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426) [netty-all-4.1.3.Final.jar:4.1.3.Final]
Jan  4 18:09:08 MY-SERVER logstash: at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398) [netty-all-4.1.3.Final.jar:4.1.3.Final]
Jan  4 18:09:08 MY-SERVER logstash: at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877) [netty-all-4.1.3.Final.jar:4.1.3.Final]
Jan  4 18:09:08 MY-SERVER logstash: at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) [netty-all-4.1.3.Final.jar:4.1.3.Final]
Jan  4 18:09:08 MY-SERVER logstash: at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]

You have got a syntax error in your ruby filter. Also, the kafka output can't reach the kafka system.

Post the config and we can help some more.

Hello guyboertje,

I sent the filter configuration files here :
https://www.cjoint.com/doc/18_01/HAkhSKH2mYl_Filter-Rules.zip

Thank you for your help.

First off, the ruby code that fails.
In 1-110-filter-log-emon_peer_service.conf
This type of code will not work well, you can't do a double get() like this.

event.get('emon_service').set('executionDuration', event.get('emon_service').get('endTime') - event.get('emon_service').get('startTime'));

This is better.

event.set('[emon_service][executionDuration]', event.get('[emon_service][endTime]') - event.get('[emon_service][startTime]'));

This is best:

event.set('[emon_service][executionDuration]', (event.get('[emon_service][endTime]') - event.get('[emon_service][startTime]') rescue 0);

In 1-999-filter-lowercase-loglevel.conf the ruby filter code is not very performant at all.
This is better:

loglevel = event.get('loglevel').downcase
value = 'unknown'
case loglevel
when 'alert', 'critical', 'debug', 'emergency', 'error','fatal', 
                'info', 'notice', 'severe', 'trace', 'warn', 'warning'
  value = loglevel
when 'crit'
  value = 'critical'
when 'emerg'
  value = 'emergency'
when 'err'
  value = 'error'
end
event.set('loglevel', value)

About the Kafka output errors.
These errors are the only ones which are trapped and result in a retry but are not logged as having happened.
org.apache.kafka.common.errors.TimeoutException
org.apache.kafka.common.errors.InterruptException
org.apache.kafka.common.errors.SerializationException

Some internet search on:
TimeoutException; not much, ~ client version vs broker version mismatch (but hard fail not intermittent)
InterruptException; nothing
SerializationException; not much again.

The only way to investigate further is for you to edit the kafka output plugin code and logging statements.

All other errors are trapped and logged in one form or another.

I set the logs in debug mode and found this error message :
[2018-01-11T10:45:43,018][DEBUG][org.apache.kafka.clients.producer.KafkaProducer] Exception occurred during message send:
org.apache.kafka.common.errors.RecordTooLargeException: The message is 13105524 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

Thus I edited the following files :

  • /opt/kafka/config/consumer.properties
    fetch.message.max.bytes=209715200 #added this parameter

  • /opt/kafka/config/server.properties
    socket.request.max.bytes=209715200 #instead of 10MB
    replica.fetch.max.bytes=209715200 #added this parameter
    message.max.bytes=209715200 #added this parameter

I still need to check the ruby exception regarding 'get' method : your previous message may need a lot.
I let you know.

Thank you for your support.

1 Like

Great.

So it does look like a SerializationException occurred. Sad that logging needs to be at the DEBUG level to spot this. We use the Kafka client jars that hook into our Log4J logging - seems as if this is a Client logging statement at DEBUG.

Unfortunately, today, I get the same issue.
Finally, a wrong fix found regarding the max size.

It looks the same issue than these topics but my config is correct in /opt/kafka/config/server.properties !
listeners=PLAINTEXT://1.2.3.4:9092 #(IP changed)

(Indeed, I see in the start sequence these two parameters) :
security.protocol = PLAINTEXT
bootstrap.servers = [1.2.3.4:9092, 1.2.3.5:9092]

It's not feeding through kafka.

In debug mode I find :
[2018-01-12T11:49:10,544][DEBUG][org.apache.kafka.clients.NetworkClient] Initiating connection to node -1 at 1.2.3.4:9092.
[2018-01-12T11:49:10,546][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name node--1.bytes-sent
[2018-01-12T11:49:10,547][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name node--1.bytes-received
[2018-01-12T11:49:10,547][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name node--1.latency
[2018-01-12T11:49:10,547][DEBUG][org.apache.kafka.common.network.Selector] Connection with /1.2.3.4 disconnected
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_151]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_151]
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51) ~[kafka-clients-0.10.0.1.jar:?]
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73) ~[kafka-clients-0.10.0.1.jar:?]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:309) [kafka-clients-0.10.0.1.jar:?]
at org.apache.kafka.common.network.Selector.poll(Selector.java:283) [kafka-clients-0.10.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) [kafka-clients-0.10.0.1.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229) [kafka-clients-0.10.0.1.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) [kafka-clients-0.10.0.1.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]

I tried to shutdown one of the servers. I have only 1 error message from time to time "Sending batch to Kafka failed. Will retry after a delay."

Once started, I have many times this error !

Do you have any idea ??
Thank you !

I have no ideas. It seems as if the Kakfa cluster is unstable or the network between LS and Kafka.

I can't explain it as these servers are absolutely in the same VLAN without iptables or FW.
I'm really stuck with this.

I can see connexions established on port 9092 (IP replaced=d by 1.2.3.4 & 1.2.3.5) :

tcp        0      0 1.2.3.5:59195      1.2.3.4:3888       ESTABLISHED 1000       26335      10633/java
tcp        0      0 1.2.3.5:9092       1.2.3.4:44920      ESTABLISHED 1001       40394      11884/java
tcp        0      0 1.2.3.5:2888       1.2.3.4:48359      ESTABLISHED 1000       135340437  10633/java
tcp        0      0 1.2.3.5:42849      1.2.3.4:2181       ESTABLISHED 1001       135340439  11884/java
tcp        0      0 1.2.3.5:46989      1.2.3.4:9092       ESTABLISHED 1002       37970      10601/java
tcp        0      0 1.2.3.5:47023      1.2.3.4:9092       ESTABLISHED 1001       43655      11884/java
tcp        0      0 1.2.3.5:46987      1.2.3.4:9092       ESTABLISHED 1002       37968      10601/java
tcp        0      0 1.2.3.5:47060      1.2.3.4:9092       ESTABLISHED 1002       135446083  18598/java

And also listening to any IP address :
tcp 0 0 1.2.3.5:9092 0.0.0.0:* LISTEN 1001 37832 11884/java

I'll try to downgrade kafka to previous version.
If I downgrade also Logstash to 5.0. Will it work with ElasticSearch 5.6.4 ?
Other ideas are of course still welcomed !

Many thanks for your help.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.