Hi,
I'm been trying to figure out how to transform a json encoded using kafka input and using avro as an output. So far, no luck
This is an example of my current configuration without using any filtering or manipulation
input{
kafka {
bootstrap_servers => "localhost:9092" # point to the kafka instance
topics => "ipfix_json"
codec => "json"
}
}
filter {
}
output {
stdout { codec => rubydebug }
}
When running logstash, this is a snip output when using codec rubydebug
{
"stamp_updated" => "2018-03-23 05:01:41",
"port_dst" => 58499,
"tos" => 0,
"timestamp_start" => "2018-03-23 05:00:33.920000",
"net_src" => "10,1,1,0",
"mask_src" => 24,
"ip_proto" => "udp",
"as_src" => 65600,
"as_dst" => 65800,
"bytes" => 134,
"@timestamp" => 2018-03-25T05:58:21.013Z,
"tcp_flags" => "0",
"stamp_inserted" => "2018-03-23 05:00:00",
"event_type" => "purge",
"iface_in" => 654,
"packets" => 1,
"writer_id" => "default_kafka/2273",
"@version" => "1",
"port_src" => 5263,
"timestamp_arrival" => "2018-03-23 05:01:34.555094",
"timestamp_end" => "2018-03-23 05:00:33.920000",
"iface_out" => 670,
"sampling_rate" => 1000,
"peer_ip_src" => "10.20.30.1",
"ip_src" => "10.1.1.100",
"mask_dst" => 23,
"ip_dst" => "10.2.2.200",
"peer_ip_dst" => "",
"net_dst" => "10.2.2.0"
}
Now, the avro schema that we are using is (pnda.avsc):
{
"namespace": "pnda.entity",
"type": "record",
"name": "event",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "src", "type": "string"},
{"name": "host_ip", "type": "string"},
{"name": "rawdata", "type": "bytes"}
]
}
And the logstash output configuration with kafka/avro looks like this
output {
kafka {
bootstrap_servers => "10.180.221.130:9092" # change the broker list IPs
topic_id => "netflow_json"
compression_type => "none" # "none", "gzip", "snappy", "lz4"
value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer'
codec => pnda-avro { schema_uri => "/home/ubuntu/logstash-6.2.3/pnda.avsc" }
}
}
When running, the input doesn't seem to match the avro schema
[2018-03-25T06:31:30,043][FATAL][logstash.runner] An unexpected error occurred!
{:error=>#<Avro::IO::AvroTypeError: The datum {"ip_proto"=>"tcp",
"net_dst"=>"10.10.0.0", "iface_in"=>654, "tos"=>0, "@timestamp"=>2018-03-25T06:31:29.662Z,
"timestamp_end"=>"2018-03-23 05:00:18.304000", "peer_ip_src"=>"10.1.1.1",
"timestamp"=>1521959489662, "timestamp_start"=>"2018-03-23 05:00:18.304000", "@version"=>"1",
"sampling_rate"=>1000, "tcp_flags"=>"24", "stamp_inserted"=>"2018-03-23 05:00:00",
"stamp_updated"=>"2018-03-23 05:01:21", "ip_src"=>"10.20.30.100", "writer_id"=>"default_kafka/2263",
"timestamp_arrival"=>"2018-03-23 05:01:20.12291", "bytes"=>1500, "as_src"=>65600,
"net_src"=>"10.10.1.50", "event_type"=>"purge", "port_src"=>2099, "mask_src"=>32,
"port_dst"=>53534, "as_dst"=>65500, "packets"=>1, "ip_dst"=>"10.10.50.100", "peer_ip_dst"=>"",
"mask_dst"=>20, "iface_out"=>704} is not an example of schema {"type":"record","name":"event",
"namespace":"pnda.entity","fields":[{"name":"timestamp","type":"long"},{"name":"src","type":"string"},
{"name":"host_ip","type":"string"},{"name":"rawdata","type":"bytes"}]}>,
:backtrace=>["/home/ubuntu/logstash-6.2.3/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:547:in
`write_data'", "/home/ubuntu/logstash-6.2.3/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:542:in `
write'", "/home/ubuntu/logstash-6.2.3/vendor/local_gems/212eb720/logstash-codec-pnda-avro-3.1.1-java/lib/logstash/codecs/pnda-avro.rb:91:in `encode'",
"/home/ubuntu/logstash-6.2.3/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.8/lib/logstash/outputs/kafka.rb:221:in `block in multi_receive'",
"org/jruby/RubyArray.java:1734:in `each'", "/home/ubuntu/logstash-6.2.3/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.8/lib/logstash/outputs/kafka.rb:219:in
`multi_receive'", "/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:13:in `multi_receive'",
"/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/output_delegator.rb:49:in `multi_receive'",
"/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/pipeline.rb:479:in `block in output_batch'", "org/jruby/RubyHash.java:1343:in `each'",
"/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/pipeline.rb:478:in `output_batch'", "/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/pipeline.rb:430:in `worker_loop'", "/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/pipeline.rb:385:in `block in start_workers'"]}
[2018-03-25T06:31:30,093][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: org.jruby.exceptions.RaiseException:
(SystemExit) exit
ubuntu@ip-10-180-221-190:~/logstash-6.2.3$
After reading the last two days about logstash filter plugins and ruby, it seems that I need to transform the input to a structure that matches the avro schema. However, so far, I haven't been able to figure it out.
How can I translate the input to something that matches the avro schema? I try first to use mutate filter, but I couldn't figure out how to mutate into the fields array object. Now, I'm thinking the only way to achieve this is by using ruby plugin filter, but I haven't found any good guide or tutorial on how to use this, how I can access the original json input and manipulate to a new output.
I basically need the whole content of the json input to be stored into rawdata field from avro schema, and how I construct the rest of the json that match the rest of the avro schema.
thanks in advance for any direction or help.