Kafka input with different deserializer classes

Hi,

I'm trying to consume events from Kafka with Logstash to send them to Elastic.
I'm currently using a pipeline with the following input:

  kafka {
    type => "standard_event"
    bootstrap_servers => "xxx01:9092,xxx02:9092,xxx03:9092,xxx04:9092"
    topics => ["something"]
    group_id => "somehow"
    decorate_events => true
    consumer_threads => 10
    key_deserializer_class => "org.apache.kafka.connect.storage.StringConverter"
    value_deserializer_class => "io.confluent.connect.avro.AvroConverter"
    codec => avro {
      schema_uri => "/root/schema.avsc"
      tag_on_failure => true
    }
  }

I'm trying to use different classes in order to deserialize events.
I put those JARs under /usr/share/logstash/lib/ but, somehow, they are not found by Logstash when I launch it.
I get the following error:

Error: uncaught throw org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.connect.storage.StringConverter for configuration key.deserializer: Class org.apache.kafka.connect.storage.StringConverter could not be found.
  Exception: UncaughtThrowError
  Stack: org/jruby/RubyKernel.java:1137:in `throw'

Does anyone know where should I put those JARs so that Logstash can use them?

This shouldn't be throwing an UncaughtThrowError, and I've opened logstash-plugins/logstash-input-kafka#259 to address this.

However, even after the above fix gets merged, you'll still have a problem; while this plugin allows us to specify the name of a class, it does not do anything to add jars to the class loader, and expects the classes being referenced to already exist on the class path.

It looks like there's an open ticket to add this functionality: logstash-plugins/logstash-input-kafka#238; feel free to subscribe and/or contribute :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.