Get error when config "value_serializer" and "key_serializer" in output part

I am testing transfer data between mutile kafka cluster
my configuration is below:

input { 
    kafka {
        bootstrap_servers => "10.62.169.206:9092,10.62.220.44:9092,10.62.220.150:9092"
        topics => ["prod-skywalking-meters"]
        group_id => "prod-skywalking-meters"
        value_deserializer_class => "org.apache.kafka.common.serialization.BytesDeserializer" 
        codec => plain
    }
}


output {
        kafka {
            codec => plain {
               format => "%{message}"
            }
            bootstrap_servers => ["10.62.169.141:9092,10.62.109.72:9092,10.62.174.33:9092"]
            topic_id => "pre-skywalking-meters"
            value_serializer => "org.apache.kafka.common.serialization.BytesSerializer"
            key_serializer => "org.apache.kafka.common.serialization.BytesSerializer"
        }
    }

when I start logstash, I got error:

[ERROR] 2024-01-15 17:55:08.893 [[main]-pipeline-manager] javapipeline - Pipeline error {:pipeline_id=>"main", :exception=>#<NameError: uninitialized constant LogStash::Outputs::Kafka::ConfigurationError>, :backtrace=>["org/jruby/RubyModule.java:3766:in `const_missing'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.9.0-java/lib/logstash/outputs/kafka.rb:211:in `register'", "org/logstash/config/ir/compiler/OutputStrategyExt.java:131:in `register'", "org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:68:in `register'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:233:in `block in register_plugins'", "org/jruby/RubyArray.java:1821:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:232:in `register_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:598:in `maybe_setup_out_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:245:in `start_workers'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:190:in `run'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:142:in `block in start'"], "pipeline.sources"=>["/etc/logstash/conf.d/skywalking-meters2.conf"], :thread=>"#<Thread:0x6897df0e run>"}
[INFO ] 2024-01-15 17:55:08.895 [[main]-pipeline-manager] javapipeline - Pipeline terminated {"pipeline.id"=>"main"}
[ERROR] 2024-01-15 17:55:08.918 [Converge PipelineAction::Create<main>] agent - Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}
[INFO ] 2024-01-15 17:55:08.998 [LogStash::Runner] runner - Logstash shut down

what should I do? can somebody give some advice?
thank you

I think that is a bug. In the current version of the code (10.9.0 is quite old) your configuration would throw that exception here. The value_serializer can be org.apache.kafka.common.serialization.StringSerializer or org.apache.kafka.common.serialization.ByteArraySerializer. Nothing else is allowed.

There is no LogStash::Outputs::Kafka::ConfigurationError class. There is a LogStash::ConfigurationError, which is successfully raised here if you set partitioner => "foo" on the output.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.