Hi ,
I m trying to read the Avro message from the confluent kafka topic. Here is the setting in try to provide.
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["AOC_CE.global"]
consumer_threads => 9
auto_offset_reset => "earliest"
group_id => "testLogstash"
key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
}
}
output {
stdout {
codec => rubydebug
}
}
but here is the error i m getting.
Plugin: <LogStash::Inputs::Kafka bootstrap_servers=>"kafka.us-east-1.prod.kafka.away.black:9092", topics=>["AOC_CE.global"], consumer_threads=>9, auto_offset_reset=>"earliest", group_id=>"testLogstash", key_deserializer_class=>"io.confluent.kafka.serializers.KafkaAvroDeserializer", id=>"c162c03aa04f324d8c0d7a48f6d7744cc4481101-1", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_c124cdab-8134-4ed7-8859-809c40832170", enable_metric=>true, charset=>"UTF-8">, auto_commit_interval_ms=>"5000", client_id=>"logstash", enable_auto_commit=>"true", value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl=>false, security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI", decorate_events=>false>
Error: uncaught throw cannot link Java class org.apache.kafka.clients.consumer.ConsumerConfig (java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.clients.consumer.ConsumerConfig) in thread 0x338f0
^C[2017-05-10T12:38:35,216][WARN ][logstash.runner ] SIGINT received. Shutting down the agent.
[2017-05-10T12:38:35,223][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}
[2017-05-10T12:38:35,226][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<NoMethodError: undefined method `each' for nil:NilClass>, :backtrace=>["/usr/local/Cellar/logstash/5.4.0/libexec/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.6/lib/logstash/inputs/kafka.rb:219:in `stop'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/inputs/base.rb:89:in `do_stop'", "org/jruby/RubyArray.java:1613:in `each'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/pipeline.rb:486:in `shutdown'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/agent.rb:426:in `stop_pipeline'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/agent.rb:442:in `shutdown_pipelines'", "org/jruby/RubyHash.java:1342:in `each'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/agent.rb:442:in `shutdown_pipelines'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/agent.rb:139:in `shutdown'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/runner.rb:289:in `execute'", "/usr/local/Cellar/logstash/5.4.0/libexec/vendor/bundle/jruby/1.9/gems/clamp-0.6.5/lib/clamp/command.rb:67:in `run'", "/usr/local/Cellar/logstash/5.4.0/libexec/logstash-core/lib/logstash/runner.rb:185:in `run'", "/usr/local/Cellar/logstash/5.4.0/libexec/vendor/bundle/jruby/1.9/gems/clamp-0.6.5/lib/clamp/command.rb:132:in `run'", "/usr/local/Cellar/logstash/5.4.0/libexec/lib/bootstrap/environment.rb:71:in `(root)'"]}
Now if I made the changes in the above config.
input {
kafka {
bootstrap_servers => "kafka.us-east-1.prod.kafka.away.black:9092"
topics => ["AOC_CE.global"]
consumer_threads => 9
auto_offset_reset => "earliest"
group_id => "testLogstash"
key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
codec => {
schema_registry => "http://localhost::8081"
}
}
}
output {
stdout {
codec => rubydebug
}
}
i m getting the following error.
Sending Logstash's logs to /usr/local/Cellar/logstash/5.4.0/libexec/logs which is now configured via log4j2.properties
[2017-05-10T13:50:05,957][ERROR][logstash.agent ] Cannot create pipeline {:reason=>"Expected one of #, { at line 11, column 9 (byte 312) after input {\n kafka {\n bootstrap_servers => "kafka.us-east-1.prod.kafka.away.black:9092"\n topics => ["AOC_CE.global"]\n consumer_threads => 9\n auto_offset_reset => "earliest"\n group_id => "testLogstash"\n key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"\n }\n\n codec "}
Now if I again made the changes in the above config.
input {
kafka {
bootstrap_servers => "kafka.us-east-1.prod.kafka.away.black:9092"
topics => ["AOC_CE.global"]
consumer_threads => 9
auto_offset_reset => "earliest"
group_id => "testLogstash"
key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
codec =>{
avro => {
schema_registry => "http://localhost:8081"
}
}
}
}
output {
stdout {
codec => rubydebug
}
}
now error i got is
Sending Logstash's logs to /usr/local/Cellar/logstash/5.4.0/libexec/logs which is now configured via log4j2.properties
[2017-05-10T13:53:21,324][ERROR][logstash.agent ] Cannot create pipeline {:reason=>"Expected one of #, { at line 11, column 9 (byte 312) after input {\n kafka {\n bootstrap_servers => \"kafka.us-east-1.prod.kafka.away.black:9092\"\n topics => [\"AOC_CE.global\"]\n consumer_threads => 9\n auto_offset_reset => \"earliest\"\n group_id => \"testLogstash\"\n key_deserializer_class => \"io.confluent.kafka.serializers.KafkaAvroDeserializer\"\n }\n\n codec "}
I m using logstash 5.4.0 and java java8
Need help to configure this.
Thanks