Provide custom Kafka message decoder to Logstash

In my Logstash configuration I'm defining a custom Kafka message decoder but when I startup Logstash I get an error that Logstash cannot find my class. Is there a relatively easy way to point Logstash to my jar where the class is located or how do I go about doing this?

config:

input {
kafka {
consumer_id => "logstash_elastic_publisher"
decoder_class => "org.test.mypackage.serializer.DocumentMessageDecoder"
key_decoder_class => "kafka.serializer.StringDecoder"
topic_id => "documents"
type => "documents_publish"
zk_connect => "localhost:2181"
}
}
output {
if [type] == "documents_publish" {
elasticsearch { host => localhost }
}
else {
stdout { }
}
}

You can put it in the root of your logstash install. That should get it into the classpath.

The basic trick is that you have to get it where java is looking for libraries.

I'm seeing the same error as the previous poster. I have a config defined:
input {
kafka {
zk_connect => "mzkeeper1q.qadfw2.mycomp.com:5181,mzkeeper2q.qadfw2.mycomp.com:5181,mzkeeper3q.qadfw2.mycomp.com:5181"
topic_id => "raw_log2kafka"
consumer_threads => 4
decoder_class => "com.mycomp.etl.avro.schema.kafka.RawLog2KafkaSerializer"
key_decoder_class => "com.mycomp.etl.avro.schema.kafka.RawLog2KafkaSerializer"
}
}

and I've put the decoder class (and the jar file) in the directory where logstash is installed, its bin and its lib directory, and I still get:
kafka client threw exception, restarting {:exception=>#<NameError:
cannot load Java class com.mycomp.etl.avro.schema.kafka.RawLog2KafkaSerializer

,
:level=>:warn, :file=>"logstash/inputs/kafka.rb", :line=>"150", :method=>"run"}

From what I've read, I thought I only needed the class file and I only needed it installed in the main logstash directory. The rest was just to try to get it working. This was using the main logstash 1.5.4. I then downloaded logstash-input-kafka from git (GitHub - joekiller/logstash-input-kafka: Kafka input for Logstash). I'm concerned because I don't even see the decoder_class/key_decoder_class parameters in that version and I hope those parameters are still being supported, because I coded it based on the documentation.

Any guidance would be appreciated. Keep in mind, I'm a java developer, so if you have ruby directions, please don't take anything for granted.

Thanks.
Ilene

In addition to the config changes, you'd need to add it to the classpath and start LS with your custom serializer.

input {
decoder_class => "com.mycomp.etl.avro.schema.kafka.RawLog2KafkaSerializer"
...
}

Logstash environment:

CLASSPATH=$CLASSPATH:/path/to/serializer.jar
bin/logstash -f <config_path>

I'll add this information to the docs

1 Like

That would make things a lot simpler but that has never worked. Using CLASSPATH on the command line is completely ignored for me. When I moved my jar with the customized deserializer to the logstash-1.5.4/vendor/jruby/lib and started logstash, the error message finally changed to:
kafka client threw exception, restarting {:exception=>#<NameError: cannot link Java class com....etl.avro.schema.kafka.RawLog2KafkaSerializer, probable missing dependency: kafka/serializer/Encoder>, :level=>:warn}

which is the class it inherits from. After copying the symbolic link from my maven repo to verdor/jruby/lib, I got errors from the plug in, like :

Couldn't find any input plugin named 'kafka'. Are you sure this is correct? Trying to load the kafka input plugin resulted in this error: cannot link Java class kafka.consumer.ConsumerIterator, probable missing dependency: scala/ScalaObject

Couldn't find any input plugin named 'kafka'. Are you sure this is correct? Trying to load the kafka input plugin resulted in this error: load error: jruby-kafka/consumer -- java.lang.NoClassDefFoundError: scala/reflect/ClassManifest

Couldn't find any input plugin named 'kafka'. Are you sure this is correct? Trying to load the kafka input plugin resulted in this error: load error: jruby-kafka/consumer -- java.lang.NoClassDefFoundError: org/apache/log4j/Logger

Ultimately, the jars I had to copy were my customized deserializer (lijit-storm-avro-20150708.jar), kafka_2.9.2-0.8.1.jar, log4j-1.2...jar, scala-libarary-2.9.2.jar.

SO, I linked those jars in from the maven repository.

Now, when I run, I get :

$ ./logstash -f ../logstash-avro.conf

Exception in thread "<kafka" java.lang.UnsupportedOperationException

at java.lang.Thread.stop(Thread.java:869)

at org.jruby.RubyThread.exceptionRaised(RubyThread.java:1221)

at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:112)

at java.lang.Thread.run(Thread.java:745)

Logstash startup completed

Logstash shutdown completed

There is no logging information for me to troubleshoot from. I've been trying hard to work with someone at elastic because we have several applications we'd like to apply ELK toward in my company but I haven't been able to get this first one off the ground, and haven't been able to talk with anyone there since I first started having problems weeks ago. I thought I was making progress this morning when I was finally getting a new message but I see logstash is finally dead in the water.

Can you please help me speak with someone elastic so that I can move forward in evaluating whether we can use these products? I'd really appreciate it. The alternative approach I'm going to pursue is to write directly to elasticsearch because it doesn't seem like a cutomized avro deserializer from kafka is supported in logstash anymore. I'd seen success with logstash at my previous company so I felt more optimistic about it than I do now, but that was reading directly from files. There was no customization involved.

If you are running into this problem as well, here is what I did to work around the issues I was having with getting the logstash kafka input plugin working with a customized deserializer:

Make sure you are not running Java 8. Download Java 7.
Trying to connect to elasticsearch programmatically using Java 8 also did not work. I tried this as a work-around when I wasn't able to get the kafka plugin working and wasn't getting a response for help from elastic. I thought I'd read off of the queue and write to elasticsearch directly without logstash. I didn't try it again after I loaded 7 but I assume it would work as documented.

Copy the jar of the customized deserializer class defined in your config file as your input.kafka.decoder_class to your logstash-1.5.4/vendor/jruby/lib directory. The CLASSPATH in the environment and on the commandline are ignored. All jars that your jar relies on also need to be copied to this directory. If you don't, when you run logstash, you'll get a runtime error until they're all there. Helpful hint: go to the directory of the pom.xml where your jar is built from and run mvn dependency:tree. This shows all of the versions of the jars that your customized jar relies on.

FYI, I never did get a response via the bulletin board, email or Contact Us.

If your logstash version is 2.1 and build environment is maven based see my post on Where to drop jar containing custome plugin or encoder like custom Kafka message decoder

Why I get a exception NameError: no matching java constructor??

decoder code as follow:

public class GZipDecoder  implements Decoder<byte[]> {

    @Override
    public byte[] fromBytes(byte[] messageBytes) {

Pls help ~~

It works now :slight_smile: