Logstash Confluent Unable to config

Hi ,

I m trying to read the Avro message from the confluent kafka topic. Here is the setting in try to provide.

    input {
      kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["AOC_CE.global"]
        consumer_threads => 9
        auto_offset_reset => "earliest"
        group_id => "testLogstash"
        key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
   }
 }

 output {
  stdout {
    codec => rubydebug 
  }
}

but here is the error i m getting.

  Plugin: <LogStash::Inputs::Kafka bootstrap_servers=>"kafka.us-east-1.prod.kafka.away.black:9092", topics=>["AOC_CE.global"], consumer_threads=>9, auto_offset_reset=>"earliest", group_id=>"testLogstash", key_deserializer_class=>"io.confluent.kafka.serializers.KafkaAvroDeserializer", id=>"c162c03aa04f324d8c0d7a48f6d7744cc4481101-1", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_c124cdab-8134-4ed7-8859-809c40832170", enable_metric=>true, charset=>"UTF-8">, auto_commit_interval_ms=>"5000", client_id=>"logstash", enable_auto_commit=>"true", value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl=>false, security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI", decorate_events=>false>
  Error: uncaught throw cannot link Java class org.apache.kafka.clients.consumer.ConsumerConfig (java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.clients.consumer.ConsumerConfig) in thread 0x338f0
^C[2017-05-10T12:38:35,216][WARN ][logstash.runner          ] SIGINT received. Shutting down the agent.
[2017-05-10T12:38:35,223][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}
[2017-05-10T12:38:35,226][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>#<NoMethodError: undefined method `each' for nil:NilClass>, :backtrace=>["/usr/local/Cellar/logstash/5.4.0/libexec/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.6/lib/logstash/inputs/kafka.rb:219:in `stop'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/inputs/base.rb:89:in `do_stop'", "org/jruby/RubyArray.java:1613:in `each'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/pipeline.rb:486:in `shutdown'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/agent.rb:426:in `stop_pipeline'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/agent.rb:442:in `shutdown_pipelines'", "org/jruby/RubyHash.java:1342:in `each'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/agent.rb:442:in `shutdown_pipelines'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/agent.rb:139:in `shutdown'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/runner.rb:289:in `execute'", "/usr/local/Cellar/logstash/5.4.0/libexec/vendor/bundle/jruby/1.9/gems/clamp-0.6.5/lib/clamp/command.rb:67:in `run'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/runner.rb:185:in `run'", "/usr/local/Cellar/logstash/5.4.0/libexec/vendor/bundle/jruby/1.9/gems/clamp-0.6.5/lib/clamp/command.rb:132:in `run'", "/usr/local/Cellar/logstash/5.4.0/libexec/lib/bootstrap/environment.rb:71:in `(root)'"]}

Now if I made the changes in the above config.

input {
kafka {
bootstrap_servers => "kafka.us-east-1.prod.kafka.away.black:9092"
topics => ["AOC_CE.global"]
consumer_threads => 9
auto_offset_reset => "earliest"
group_id => "testLogstash"
key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"

  codec => {
      schema_registry => "http://localhost::8081"
    }
  }
}

output {
  stdout {
    codec => rubydebug 
  }
}

i m getting the following error.

Sending Logstash's logs to /usr/local/Cellar/logstash/5.4.0/libexec/logs which is now configured via log4j2.properties
[2017-05-10T13:50:05,957][ERROR][logstash.agent ] Cannot create pipeline {:reason=>"Expected one of #, { at line 11, column 9 (byte 312) after input {\n kafka {\n bootstrap_servers => "kafka.us-east-1.prod.kafka.away.black:9092"\n topics => ["AOC_CE.global"]\n consumer_threads => 9\n auto_offset_reset => "earliest"\n group_id => "testLogstash"\n key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"\n }\n\n codec "}

Now if I again made the changes in the above config.

input {
  kafka {
    bootstrap_servers => "kafka.us-east-1.prod.kafka.away.black:9092"
    topics => ["AOC_CE.global"]
    consumer_threads => 9
    auto_offset_reset => "earliest"
    group_id => "testLogstash"
    key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
  codec =>{ 
    avro => {
        schema_registry => "http://localhost:8081"
      }
    }
}

}
output {
stdout {
codec => rubydebug
}
}

now error i got is

Sending Logstash's logs to /usr/local/Cellar/logstash/5.4.0/libexec/logs which is now configured via log4j2.properties
[2017-05-10T13:53:21,324][ERROR][logstash.agent           ] Cannot create pipeline {:reason=>"Expected one of #, { at line 11, column 9 (byte 312) after input {\n  kafka {\n    bootstrap_servers => \"kafka.us-east-1.prod.kafka.away.black:9092\"\n    topics => [\"AOC_CE.global\"]\n    consumer_threads => 9\n    auto_offset_reset => \"earliest\"\n    group_id => \"testLogstash\"\n    key_deserializer_class => \"io.confluent.kafka.serializers.KafkaAvroDeserializer\"\n  }\n\n  codec "}

I m using logstash 5.4.0 and java java8

Need help to configure this.
Thanks

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.