I created a partitioner_class and replaced the partitioner_class appropriately in the conf file. The jar containing the partitioner_class was added to the CLASSPATH.
However, when I started logstash, logstash errored out with a trace I have appended at the end of this comment.
The only way I can avoid the error is if I create a fat jar (jar-with-dependencies) for the partitioner_class, even though all the dependencies are included in the plugin.
Is this expected? I was under the impression that all that was required was to create a jar containing the relevant file and include that in the CLASSPATH.
ava/lang/ClassLoader.java:-2:in `defineClass1': java.lang.NoClassDefFoundError: kafka/producer/Partitioner
from java/lang/ClassLoader.java:800:in `defineClass'
from java/security/SecureClassLoader.java:142:in `defineClass'
from java/net/URLClassLoader.java:449:in `defineClass'
...
from <SOME_DIR>/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/stud-0.0.21/lib/stud/task.rb:12:in `initialize'
from java/lang/Thread.java:745:in `run'
Caused by:
URLClassLoader.java:366:in `run': java.lang.ClassNotFoundException: kafka.producer.Partitioner
from URLClassLoader.java:355:in `run'
from AccessController.java:-2:in `doPrivileged'
from URLClassLoader.java:354:in `findClass'
...
Logstash config was
output {
stdout{}
kafka {
broker_list => "localhost:9092"
topic_id => "test"
compression_codec => "none"
compressed_topics => ""
request_required_acks => 0
serializer_class => "kafka.serializer.StringEncoder"
partitioner_class => "org.x.UIDPartitioner"
request_timeout_ms => 10000
producer_type => "sync"
key_serializer_class => "kafka.serializer.StringEncoder"
message_send_max_retries => 3
retry_backoff_ms => 100
topic_metadata_refresh_interval_ms => 600000
queue_buffering_max_ms => 5000
queue_buffering_max_messages => 10000
queue_enqueue_timeout_ms => -1
batch_num_messages => 200
send_buffer_bytes => 102400
client_id => "kafka"
}
}