Hi,
I'm been trying to figure out how to transform a json encoded using kafka input and using avro as an output.  So far, no luck 
This is an example of my current configuration without using any filtering or manipulation
input{
  kafka {
    bootstrap_servers => "localhost:9092" # point to the kafka instance    
    topics => "ipfix_json"
    codec => "json"
  }
}
filter {
}
output {
  stdout { codec => rubydebug }
}
When running logstash, this is a snip output when using codec rubydebug
{
        "stamp_updated" => "2018-03-23 05:01:41",
             "port_dst" => 58499,
                  "tos" => 0,
      "timestamp_start" => "2018-03-23 05:00:33.920000",
              "net_src" => "10,1,1,0",
             "mask_src" => 24,
             "ip_proto" => "udp",
               "as_src" => 65600,
               "as_dst" => 65800,
                "bytes" => 134,
           "@timestamp" => 2018-03-25T05:58:21.013Z,
            "tcp_flags" => "0",
       "stamp_inserted" => "2018-03-23 05:00:00",
           "event_type" => "purge",
             "iface_in" => 654,
              "packets" => 1,
            "writer_id" => "default_kafka/2273",
             "@version" => "1",
             "port_src" => 5263,
    "timestamp_arrival" => "2018-03-23 05:01:34.555094",
        "timestamp_end" => "2018-03-23 05:00:33.920000",
            "iface_out" => 670,
        "sampling_rate" => 1000,
          "peer_ip_src" => "10.20.30.1",
               "ip_src" => "10.1.1.100",
             "mask_dst" => 23,
               "ip_dst" => "10.2.2.200",
          "peer_ip_dst" => "",
              "net_dst" => "10.2.2.0"
}
Now, the avro schema that we are using is (pnda.avsc):
    {
      "namespace": "pnda.entity",
      "type": "record",
      "name": "event",
      "fields": [
         {"name": "timestamp", "type": "long"},
         {"name": "src",       "type": "string"},
         {"name": "host_ip",   "type": "string"},
         {"name": "rawdata",   "type": "bytes"}
      ]
    }
And the logstash output configuration with kafka/avro looks like this
output {
    kafka {
      bootstrap_servers => "10.180.221.130:9092" # change the broker list IPs
      topic_id => "netflow_json"
      compression_type => "none" # "none", "gzip", "snappy", "lz4"
      value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer'
      codec => pnda-avro { schema_uri => "/home/ubuntu/logstash-6.2.3/pnda.avsc" }
    }
}
When running, the input doesn't seem to match the avro schema
[2018-03-25T06:31:30,043][FATAL][logstash.runner] An unexpected error occurred! 
{:error=>#<Avro::IO::AvroTypeError: The datum {"ip_proto"=>"tcp", 
"net_dst"=>"10.10.0.0", "iface_in"=>654, "tos"=>0, "@timestamp"=>2018-03-25T06:31:29.662Z,
"timestamp_end"=>"2018-03-23 05:00:18.304000", "peer_ip_src"=>"10.1.1.1", 
"timestamp"=>1521959489662, "timestamp_start"=>"2018-03-23 05:00:18.304000", "@version"=>"1", 
"sampling_rate"=>1000, "tcp_flags"=>"24", "stamp_inserted"=>"2018-03-23 05:00:00", 
"stamp_updated"=>"2018-03-23 05:01:21", "ip_src"=>"10.20.30.100", "writer_id"=>"default_kafka/2263", 
"timestamp_arrival"=>"2018-03-23 05:01:20.12291", "bytes"=>1500, "as_src"=>65600, 
"net_src"=>"10.10.1.50", "event_type"=>"purge", "port_src"=>2099, "mask_src"=>32, 
"port_dst"=>53534, "as_dst"=>65500, "packets"=>1, "ip_dst"=>"10.10.50.100", "peer_ip_dst"=>"", 
"mask_dst"=>20, "iface_out"=>704} is not an example of schema {"type":"record","name":"event",
"namespace":"pnda.entity","fields":[{"name":"timestamp","type":"long"},{"name":"src","type":"string"},
{"name":"host_ip","type":"string"},{"name":"rawdata","type":"bytes"}]}>, 
:backtrace=>["/home/ubuntu/logstash-6.2.3/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:547:in 
`write_data'", "/home/ubuntu/logstash-6.2.3/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:542:in `
write'", "/home/ubuntu/logstash-6.2.3/vendor/local_gems/212eb720/logstash-codec-pnda-avro-3.1.1-java/lib/logstash/codecs/pnda-avro.rb:91:in `encode'", 
"/home/ubuntu/logstash-6.2.3/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.8/lib/logstash/outputs/kafka.rb:221:in `block in multi_receive'", 
"org/jruby/RubyArray.java:1734:in `each'", "/home/ubuntu/logstash-6.2.3/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.8/lib/logstash/outputs/kafka.rb:219:in 
`multi_receive'", "/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:13:in `multi_receive'", 
"/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/output_delegator.rb:49:in `multi_receive'", 
"/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/pipeline.rb:479:in `block in output_batch'", "org/jruby/RubyHash.java:1343:in `each'",
 "/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/pipeline.rb:478:in `output_batch'", "/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/pipeline.rb:430:in `worker_loop'", "/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/pipeline.rb:385:in `block in start_workers'"]}
[2018-03-25T06:31:30,093][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: org.jruby.exceptions.RaiseException: 
(SystemExit) exit
ubuntu@ip-10-180-221-190:~/logstash-6.2.3$ 
After reading the last two days about logstash filter plugins and ruby, it seems that I need to transform the input to a structure that matches the avro schema. However, so far, I haven't been able to figure it out.
How can I translate the input to something that matches the avro schema? I try first to use mutate filter, but I couldn't figure out how to mutate into the fields array object. Now, I'm thinking the only way to achieve this is by using ruby plugin filter, but I haven't found any good guide or tutorial on how to use this, how I can access the original json input and manipulate to a new output.
I basically need the whole content of the json input to be stored into rawdata field from avro schema, and how I construct the rest of the json that match the rest of the avro schema.
thanks in advance for any direction or help.
 I will post details tomorrow.