Kafka -> Logstash -> ES ..... using avro_schema_registry plugin & decorate_events & mutate = Fail. I would like to get Key info into ES using Mutate

I am not having success with the following:

Kafka Input (both key & body avro schema are in schema registry)

I would like to pull fields from they Key and combine with the body and output to ES.

decorate_events=true, with the codec=>avro_schema_registry plugin, w/o the key_deserializer class allows mutate operation to find values however key is in Bytes.

decorate_events=true with they key_deserializer class defined fails w/ exception

input {
.......
...
#decorate_events => true
codec => avro_schema_registry {
endpoint => "http://...........:8081"
}
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}

filter {
mutate {
add_field => {
"my_kafka_partition" => "%{[@metadata][kafka][partition]}"
"my_kafka_offset" => "%{[@metadata][kafka][offset]}"
"my_kafka_key" => "%{message}"
}
}
}

exception when using decorate_events=> true & the key_deserializer_class defined

Exception in thread "Ruby-0-Thread-39: :1" org.logstash.MissingConverterException: Missing Converter handling for full class name=[B, simple name=byte[]
        at org.logstash.Valuefier.fallbackConvert(Valuefier.java:97)
        at org.logstash.Valuefier.convert(Valuefier.java:75)
        at org.logstash.Valuefier.lambda$static$3(Valuefier.java:51)
        at org.logstash.Valuefier.convert(Valuefier.java:73)
        at org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_set_field(JrubyEventExtLibrary.java:99)
        at org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.gen)
        at org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:741)
        at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:145)
        at home.rduffy.logstash.logstash_minus_6_dot_4_dot_0.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_input_minus_kafka_minus_8_dot_1_dot_1.lib.logstash.inputs.kafka.RUBY$block$thread_runner$3(/home/rduffy/logstash/logstash-6.4.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.1.1/lib/logstash/inputs/kafka.rb:263)
        at org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:156)
        at org.jruby.runtime.BlockBody.yield(BlockBody.java:114)
        at org.jruby.runtime.Block.yield(Block.java:165)
        at org.jruby.ir.runtime.IRRuntimeHelpers.yield(IRRuntimeHelpers.java:415)
        at org.jruby.ir.targets.YieldSite.yield(YieldSite.java:87)
        at home.rduffy.logstash.logstash_minus_6_dot_4_dot_0.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_1_dot_0.lib.logstash.codecs.avro_schema_registry.RUBY$method$decode$0(/home/rduffy/logstash/logstash-6.4.0/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro_schema_registry-1.1.0/lib/logstash/codecs/avro_schema_registry.rb:222)
        at home.rduffy.logstash.logstash_minus_6_dot_4_dot_0.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_1_dot_0.lib.logstash.codecs.avro_schema_registry.RUBY$method$decode$0$__VARARGS__(/home/rduffy/logstash/logstash-6.4.0/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro_schema_registry-1.1.0/lib/logstash/codecs/avro_schema_registry.rb)
        at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:77)
        at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:93)
        at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:145)
        at home.rduffy.logstash.logstash_minus_6_dot_4_dot_0.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_input_minus_kafka_minus_8_dot_1_dot_1.lib.logstash.inputs.kafka.RUBY$block$thread_runner$2(/home/rduffy/logstash/logstash-6.4.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.1.1/lib/logstash/inputs/kafka.rb:256)
        at org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:156)
        at org.jruby.runtime.BlockBody.yield(BlockBody.java:114)
        at org.jruby.runtime.Block.yield(Block.java:165)
        at org.jruby.javasupport.ext.JavaLang$Iterable.each(JavaLang.java:93)
        at org.jruby.javasupport.ext.JavaLang$Iterable$INVOKER$s$0$0$each.call(JavaLang$Iterable$INVOKER$s$0$0$each.gen)
        at org.jruby.internal.runtime.methods.JavaMethod$JavaMethodZeroBlock.call(JavaMethod.java:498)
        at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:145)
        at home.rduffy.logstash.logstash_minus_6_dot_4_dot_0.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_input_minus_kafka_minus_8_dot_1_dot_1.lib.logstash.inputs.kafka.RUBY$block$thread_runner$1(/home/rduffy/logstash/logstash-6.4.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.1.1/lib/logstash/inputs/kafka.rb:255)
        at org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:145)
        at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:71)
        at org.jruby.runtime.Block.call(Block.java:124)
        at org.jruby.RubyProc.call(RubyProc.java:289)
        at org.jruby.RubyProc.call(RubyProc.java:246)
1 Like

ping ping... anyone?

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.