I'm working with logstash version 2.3.4 and trying to write an Array of Bytes to kafka.
For doing this, I built a filter that encodes a message in avro. All the enconding / deconding unit tests on the filter work fine.
I need to send the Array of Bytes to Kafka, heres is the logstash.conf file:
input {
file {
add_field => ["timestamp", ""]
path => "/var/log/httpd/access_log."
start_position => end
type => "myAvroEvent"
sincedb_path => "/root/.sincedb"
sincedb_write_interval => 15
}
}
filter {
if [type] == "myAvroEvent" {
myAvroEvent{
message => message
}
}
}
output {
if [type] == "myAvroEvent" {
kafka {
bootstrap_servers => "localhost:9092"
topic_id => "my_topic"
metadata_max_age_ms => "1000"
value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
}
}
}
When serializing to kafka logstash.log shows this:
{:timestamp=>"2016-09-16T21:08:41.741000+0000", :message=>"kafka producer threw exception, restarting", :exception=>org.apache.kafka.common.errors.SerializationException: Can't convert value of class java.lang.String to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer, :level=>:warn}
I tried to use plain codec as well and I get the same error.
Any ideas about what migth be wrong? Is logstash.conf file configured properly?
Thanks in advance.
Fernando.