Logstash 5.2 not working with Kafka 9 and AVRO

Logstash 5.2 not working with Kafka 9 and AVRO

I have installed Logstash 5.2.0 with logstash-input-kafka 4.1.1 and logstash-codec-avro 3.0.0 and trying to read data from Cloudera Kafka 9 but I am getting the gollowing error:

[2017-02-03T03:05:35,049][INFO ][logstash.pipeline        ] Pipeline main started
[2017-02-03T03:05:35,064][DEBUG][logstash.agent           ] Starting puma
[2017-02-03T03:05:35,065][DEBUG][logstash.agent           ] Trying to start WebServer {:port=>9600}
[2017-02-03T03:05:35,068][DEBUG][logstash.api.service     ] [api-service] start
[2017-02-03T03:05:35,090][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-02-03T03:05:35,303][DEBUG][logstash.inputs.kafka    ] closing {:plugin=>"LogStash::Inputs::Kafka"}
[2017-02-03T03:05:35,304][DEBUG][logstash.pipeline        ] Input plugins stopped! Will shutdown filter/output workers.
[2017-02-03T03:05:35,338][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2017-02-03T03:05:35,339][DEBUG][logstash.pipeline        ] Pushing shutdown {:thread=>"#<Thread:0x6e057136 sleep>"}
[2017-02-03T03:05:35,340][DEBUG][logstash.pipeline        ] Pushing shutdown {:thread=>"#<Thread:0xcb2b987 sleep>"}
[2017-02-03T03:05:35,340][DEBUG][logstash.pipeline        ] Pushing shutdown {:thread=>"#<Thread:0x6aa67ce5 sleep>"}
[2017-02-03T03:05:35,340][DEBUG][logstash.pipeline        ] Pushing shutdown {:thread=>"#<Thread:0x2f544881 run>"}
[2017-02-03T03:05:35,340][DEBUG][logstash.pipeline        ] Pushing shutdown {:thread=>"#<Thread:0x20d253d0 sleep>"}
[2017-02-03T03:05:35,341][DEBUG][logstash.pipeline        ] Pushing shutdown {:thread=>"#<Thread:0x65d168b sleep>"}
[2017-02-03T03:05:35,341][DEBUG][logstash.pipeline        ] Shutdown waiting for worker thread #<Thread:0x6e057136>
[2017-02-03T03:05:35,439][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>#<ArgumentError: negative length -2600952 given>, :backtrace=>["org/jruby/ext/stringio/StringIO.java:829:in `read'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:106:in `read'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:93:in `read_bytes'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:304:in `read_data'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:290:in `read_data'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:376:in `read_union'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:309:in `read_data'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:384:in `read_record'", "org/jruby/RubyArray.java:1613:in `each'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:382:in `read_record'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:310:in `read_data'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:275:in `read'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/logstash-codec-avro-3.0.0-java/lib/logstash/codecs/avro.rb:73:in `decode'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-4.1.1/lib/logstash/inputs/kafka.rb:163:in `thread_runner'", "file:/apps/logstash-5.2.0/vendor/jruby/lib/jruby.jar!/jruby/java/java_ext/java.lang.rb:12:in `each'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-4.1.1/lib/logstash/inputs/kafka.rb:162:in `thread_runner'"]}
[2017-02-03T03:05:35,444][DEBUG][logstash.agent           ] Error in reactor loop escaped: Bad file descriptor - Bad file descriptor (Errno::EBADF)
[2017-02-03T03:05:35,445][DEBUG][logstash.agent           ] ["org/jruby/RubyIO.java:3705:in `select'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/puma-2.16.0-java/lib/puma/reactor.rb:29:in `run_internal'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/puma-2.16.0-java/lib/puma/reactor.rb:138:in `run_in_thread'"]
[2017-02-03T03:05:35,445][DEBUG][logstash.agent           ] 2017-02-03 03:05:35 -0500: Listen loop error: #<Errno::EBADF: Bad file descriptor - Bad file descriptor>
[2017-02-03T03:05:35,446][DEBUG][logstash.agent           ] org/jruby/RubyIO.java:3705:in `select'
/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/puma-2.16.0-java/lib/puma/server.rb:322:in `handle_servers'
/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/puma-2.16.0-java/lib/puma/server.rb:296:in `run'
[2017-02-03T03:05:35,446][DEBUG][logstash.agent           ] Error in reactor loop escaped: Bad file descriptor - Bad file descriptor (Errno::EBADF)

Same works well with Logstash 2.4 and logstash-codec-avro 2.0.4.

Here is my logstash config file:

input {
  kafka {
    bootstrap_servers => "server1:9092,server2:9092,server3:9092,server4:9092,server5:9092"
    topics => ["mytopicname"]
    group_id => "mygroup"
    codec => avro {
      schema_uri => "/apps/schema/rocana3.schema"
    }
  }
}
filter {
}
output {
        file {
          path => "/apps/elk/test/dump.txt"
        }
}

The default serializer was changed in logstash 5 from byte array deserializer to string deserializer.

I simply added the following to my input > kafka and it worked.

key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"

input {
  kafka {
    bootstrap_servers => "server1:9092,server2:9092,server3:9092,server4:9092,server5:9092"
    topics => ["mytopicname"]
    group_id => "mygroup"
	key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    codec => avro {
      schema_uri => "/apps/schema/rocana3.schema"
    }
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.