Configure Kafka Output to send ByteArray

I'm working with logstash version 2.3.4 and trying to write an Array of Bytes to kafka.
For doing this, I built a filter that encodes a message in avro. All the enconding / deconding unit tests on the filter work fine.
I need to send the Array of Bytes to Kafka, heres is the logstash.conf file:

input {
file {
add_field => ["timestamp", ""]
path => "/var/log/httpd/access_log."
start_position => end
type => "myAvroEvent"
sincedb_path => "/root/.sincedb
sincedb_write_interval => 15


filter {
if [type] == "myAvroEvent" {
message => message

output {

    if [type] == "myAvroEvent" {
            kafka {
                    bootstrap_servers => "localhost:9092"
                    topic_id => "my_topic"
                    metadata_max_age_ms => "1000"
                    value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"


When serializing to kafka logstash.log shows this:

{:timestamp=>"2016-09-16T21:08:41.741000+0000", :message=>"kafka producer threw exception, restarting", :exception=>org.apache.kafka.common.errors.SerializationException: Can't convert value of class java.lang.String to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer, :level=>:warn}

I tried to use plain codec as well and I get the same error.

Any ideas about what migth be wrong? Is logstash.conf file configured properly?

Thanks in advance.

OK, finally I found a solution for this issue.
Kafka output plugin uses by default "plain" codec. This codec converts the message into a string before sending it to kafka. So the filter was sending a byte array, but the codec convert it to a string and the plugin tries to serialize the message using ByteArraySerializer.
The way to solve it is to set codec that sends the message as it is without converting it. Since I didn't find a codec for doing that I created a new one.

# encoding: utf-8
require "logstash/codecs/base"

# The "none" codec send the message as it is without transforming it.

class LogStash::Codecs::NoCodec < LogStash::Codecs::Base
config_name "nocodec"

def register

def decode(data)
yield"message" => data)
end # def decode

def encode(event), event["message"])
end # def encode

end # class LogStash::Codecs::NoCodec

When configuring kafka output on logstash.conf, just add this line
codec => nocodec