Can't figure out how to translate from Json to Avro using logstash

Hi,

I'm been trying to figure out how to transform a json encoded using kafka input and using avro as an output. So far, no luck :frowning:

This is an example of my current configuration without using any filtering or manipulation

input{
  kafka {
    bootstrap_servers => "localhost:9092" # point to the kafka instance    
    topics => "ipfix_json"
    codec => "json"
  }
}

filter {
}

output {
  stdout { codec => rubydebug }

}

When running logstash, this is a snip output when using codec rubydebug

{
        "stamp_updated" => "2018-03-23 05:01:41",
             "port_dst" => 58499,
                  "tos" => 0,
      "timestamp_start" => "2018-03-23 05:00:33.920000",
              "net_src" => "10,1,1,0",
             "mask_src" => 24,
             "ip_proto" => "udp",
               "as_src" => 65600,
               "as_dst" => 65800,
                "bytes" => 134,
           "@timestamp" => 2018-03-25T05:58:21.013Z,
            "tcp_flags" => "0",
       "stamp_inserted" => "2018-03-23 05:00:00",
           "event_type" => "purge",
             "iface_in" => 654,
              "packets" => 1,
            "writer_id" => "default_kafka/2273",
             "@version" => "1",
             "port_src" => 5263,
    "timestamp_arrival" => "2018-03-23 05:01:34.555094",
        "timestamp_end" => "2018-03-23 05:00:33.920000",
            "iface_out" => 670,
        "sampling_rate" => 1000,
          "peer_ip_src" => "10.20.30.1",
               "ip_src" => "10.1.1.100",
             "mask_dst" => 23,
               "ip_dst" => "10.2.2.200",
          "peer_ip_dst" => "",
              "net_dst" => "10.2.2.0"
}

Now, the avro schema that we are using is (pnda.avsc):

    {
      "namespace": "pnda.entity",
      "type": "record",
      "name": "event",
      "fields": [
         {"name": "timestamp", "type": "long"},
         {"name": "src",       "type": "string"},
         {"name": "host_ip",   "type": "string"},
         {"name": "rawdata",   "type": "bytes"}
      ]
    }

And the logstash output configuration with kafka/avro looks like this

output {
    kafka {
      bootstrap_servers => "10.180.221.130:9092" # change the broker list IPs
      topic_id => "netflow_json"
      compression_type => "none" # "none", "gzip", "snappy", "lz4"
      value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer'
      codec => pnda-avro { schema_uri => "/home/ubuntu/logstash-6.2.3/pnda.avsc" }
    }
}

When running, the input doesn't seem to match the avro schema

[2018-03-25T06:31:30,043][FATAL][logstash.runner] An unexpected error occurred! 
{:error=>#<Avro::IO::AvroTypeError: The datum {"ip_proto"=>"tcp", 
"net_dst"=>"10.10.0.0", "iface_in"=>654, "tos"=>0, "@timestamp"=>2018-03-25T06:31:29.662Z,
"timestamp_end"=>"2018-03-23 05:00:18.304000", "peer_ip_src"=>"10.1.1.1", 
"timestamp"=>1521959489662, "timestamp_start"=>"2018-03-23 05:00:18.304000", "@version"=>"1", 
"sampling_rate"=>1000, "tcp_flags"=>"24", "stamp_inserted"=>"2018-03-23 05:00:00", 
"stamp_updated"=>"2018-03-23 05:01:21", "ip_src"=>"10.20.30.100", "writer_id"=>"default_kafka/2263", 
"timestamp_arrival"=>"2018-03-23 05:01:20.12291", "bytes"=>1500, "as_src"=>65600, 
"net_src"=>"10.10.1.50", "event_type"=>"purge", "port_src"=>2099, "mask_src"=>32, 
"port_dst"=>53534, "as_dst"=>65500, "packets"=>1, "ip_dst"=>"10.10.50.100", "peer_ip_dst"=>"", 
"mask_dst"=>20, "iface_out"=>704} is not an example of schema {"type":"record","name":"event",
"namespace":"pnda.entity","fields":[{"name":"timestamp","type":"long"},{"name":"src","type":"string"},
{"name":"host_ip","type":"string"},{"name":"rawdata","type":"bytes"}]}>, 
:backtrace=>["/home/ubuntu/logstash-6.2.3/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:547:in 
`write_data'", "/home/ubuntu/logstash-6.2.3/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:542:in `
write'", "/home/ubuntu/logstash-6.2.3/vendor/local_gems/212eb720/logstash-codec-pnda-avro-3.1.1-java/lib/logstash/codecs/pnda-avro.rb:91:in `encode'", 
"/home/ubuntu/logstash-6.2.3/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.8/lib/logstash/outputs/kafka.rb:221:in `block in multi_receive'", 
"org/jruby/RubyArray.java:1734:in `each'", "/home/ubuntu/logstash-6.2.3/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.8/lib/logstash/outputs/kafka.rb:219:in 
`multi_receive'", "/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:13:in `multi_receive'", 
"/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/output_delegator.rb:49:in `multi_receive'", 
"/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/pipeline.rb:479:in `block in output_batch'", "org/jruby/RubyHash.java:1343:in `each'",
 "/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/pipeline.rb:478:in `output_batch'", "/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/pipeline.rb:430:in `worker_loop'", "/home/ubuntu/logstash-6.2.3/logstash-core/lib/logstash/pipeline.rb:385:in `block in start_workers'"]}
[2018-03-25T06:31:30,093][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: org.jruby.exceptions.RaiseException: 
(SystemExit) exit
ubuntu@ip-10-180-221-190:~/logstash-6.2.3$ 

After reading the last two days about logstash filter plugins and ruby, it seems that I need to transform the input to a structure that matches the avro schema. However, so far, I haven't been able to figure it out.

How can I translate the input to something that matches the avro schema? I try first to use mutate filter, but I couldn't figure out how to mutate into the fields array object. Now, I'm thinking the only way to achieve this is by using ruby plugin filter, but I haven't found any good guide or tutorial on how to use this, how I can access the original json input and manipulate to a new output.

I basically need the whole content of the json input to be stored into rawdata field from avro schema, and how I construct the rest of the json that match the rest of the avro schema.

thanks in advance for any direction or help.

ok, I tried few things.

This is my config file

input {
  kafka {
    bootstrap_servers => "localhost:9092" # point to the kafka instance
    topics => "ipfix_json"
    codec => "json"
    add_field => [ "src", "netflow" ]
  }
}

filter {
   ruby {
      code => "event.set('timestamp', (event.get('@timestamp').to_f * 1000).to_i)"
    }
   mutate {
      rename => { "message" => "rawdata" } # Put the content of the message to the PNDA Avro 'rawdata' field
      }
}

output {
  stdout { codec => rubydebug }
 }

With this configuration, rubydebug output throw this

{
                  "tos" => 0,
        "timestamp_end" => "2018-03-23 05:57:50.720000",
               "as_src" => 65500,
              "net_src" => "10.20.0.0",
               "ip_src" => "10.20.15.100",
            "writer_id" => "default_kafka/4108",
             "@version" => "1",
                  "src" => "netflow",
          "peer_ip_dst" => "",
                "bytes" => 134,
            "iface_out" => 656,
             "port_src" => 57624,
              "net_dst" => "10.10.0.0",
             "ip_proto" => "udp",
              "packets" => 2,
        "sampling_rate" => 1000,
       "stamp_inserted" => "2018-03-23 05:57:00",
               "as_dst" => 65600,
             "port_dst" => 5179,
            "timestamp" => 1522004214138,
          "peer_ip_src" => "10.1.1.1",
        "stamp_updated" => "2018-03-23 05:58:51",
             "iface_in" => 658,
      "timestamp_start" => "2018-03-23 05:57:44.64000",
             "mask_src" => 16,
             "mask_dst" => 24,
    "timestamp_arrival" => "2018-03-23 05:58:45.274801",
           "@timestamp" => 2018-03-25T18:56:54.138Z,
               "ip_dst" => "10.10.30.50",
           "event_type" => "purge",
            "tcp_flags" => "0"
}

When i change the output to use kafka and avro

output {
     stdout {
        codec => avro {
            schema_uri => "/home/ubuntu/logstash-6.2.3/pmacct.avsc"
           # base64_encoding => false
        }
    }
}

I still get the "is not an example of schema" :frowning:

I'm going to remove items from the schema, and see if I can find what is causing it, or is there any way I can debug this? and see what's wrong with the schema vs the input?

thanks

I ran another test with the following configuration

input {
  kafka {
    bootstrap_servers => "localhost:9092" # point to the kafka instance
    topics => "ipfix_json"
    codec => "json"
  }
}

filter {
   ruby {
      code => "
          event.set('timestamp', (event.get('@timestamp').to_f * 1000).to_i)
          event.set('src', 'netflow')
          event.set('host_ip', '10.180.221.190')
      "
    }
  mutate {
         rename => { "ip_dst" => "[rawdata][ip_dst]" }
         rename => { "ip_src" => "[rawdata][ip_src]" }
         rename => { "mask_dst" => "[rawdata][mask_dst]" }
         rename => { "mask_src" => "[rawdata][mask_src]" }
         rename => { "timestamp_end" => "[rawdata][timestamp_end]" }
         rename => { "iface_out" => "[rawdata][iface_out]" }
         rename => { "timestamp_arrival" => "[rawdata][timestamp_arrival]" }
         rename => { "port_src" => "[rawdata][port_src]" }
         rename => { "bytes" => "[rawdata][bytes]" }
         rename => { "sampling_rate" => "[rawdata][sampling_rate]" }
         rename => { "port_dst" => "[rawdata][port_dst]" }
         rename => { "stamp_updated" => "[rawdata][stamp_updated]" }
         rename => { "as_dst" => "[rawdata][as_dst]" }
         rename => { "tcp_flags" => "[rawdata][tcp_flags]" }
         rename => { "ip_proto" => "[rawdata][ip_proto]" }
         rename => { "as_src" => "[rawdata][as_src]" }
         rename => { "writer_id" => "[rawdata][writer_id]" }
         rename => { "event_type" => "[rawdata][event_type]" }
         rename => { "iface_in" => "[rawdata][iface_in]" }
         rename => { "timestamp_start" => "[rawdata][timestamp_start]" }
         rename => { "stamp_inserted" => "[rawdata][stamp_inserted]" }
         rename => { "peer_ip_dst" => "[rawdata][peer_ip_dst]" }
         rename => { "net_src" => "[rawdata][net_src]" }
         rename => { "tos" => "[rawdata][tos]" }
         rename => { "packets" => "[rawdata][packets]" }
         rename => { "peer_ip_src" => "[rawdata][peer_ip_src]" }
         rename => { "net_dst" => "[rawdata][net_dst]" }
         remove_field => [ "@timestamp" ]
         remove_field => [ "@version" ]
}
}

output {
     stdout {
        codec => pnda-avro {
            schema_uri => "/home/ubuntu/logstash-6.2.3/test.avsc"
          #  base64_encoding => false
        }
        }
}

test.avsc i removed rawdata field

ubuntu@ip-10-180-221-190:~/logstash-6.2.3$ more test.avsc
{
  "namespace": "pnda.entity",
  "type": "record",
  "name": "event",
  "fields": [
     {"name": "timestamp", "type": "long"},
     {"name": "src",       "type": "string"},
     {"name": "host_ip",   "type": "string"}
  ]
}

It seems rawdata is the one triggering the error

now i can see avro output. Does anyone have any idea why rawdata object trigger the "is not an example of schema" error?

mMSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=zMSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=0MSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=1MSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=1sSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=2sSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=3MSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=3sSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=4MSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=4sSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=5MSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=5sSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=6MSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=6sSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=7MSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=7sSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=8MSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=8sSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=9MSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=+MSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=+sSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=/MSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=/sSB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=gMWB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=gMWB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=gsWB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=hMWB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=hsWB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=hsWB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=iMWB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=isWB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=jMWB6stYDm5ldGZsb3ccMTAuMTgwLjIyMS4xOTA=jMWB6stYDm5ldGZsb3cc

ok, i got this working now but i'm too tired now :sleeping: I will post details tomorrow.

OK, at the end I had three issues

Original pnda.io avro schema was wrong

{  
  "namespace": "pnda.entity",
  "type": "record",
  "name": "event",
  "fields": [
     {"name": "timestamp", "type": "long"},
     {"name": "src",       "type": "string"},
     {"name": "host_ip",   "type": "string"},
     {"name": "rawdata",   "type": "bytes"}
  ]
}

it defines host_ip, however he information processed by logstash define it as "host". That would generate the not matching schema error

The 2nd problem was also related to the schema, and the field timestamp I was not performing the conversion correctly to long integer.

The 3rd problem was configuring input codec json seems to tell logstash to decode the input and the content of the message was no longer available.

The final configuration looks like this

# Working configuration with avro output and kafka input
input {
  kafka {
    bootstrap_servers => "localhost:9092" # point to the kafka instance
    topics => "ipfix_json"
    # Used by pnda in as one way to identify the data
    add_field => [ "src", "netflow" ]
    # This field is normally inserted by logstash, however when using kafka as input plugin, 
    # the field was not present, so we manually insert it to comply with the avro schema
    add_field => [ "host_ip", "ip_10.180.221.190" ]
  }
}
filter {
   mutate {
      # Put the content of the message to the PNDA Avro 'rawdata' field
      rename => { "message" => "rawdata" } 
      }
   ruby {
      # Put @timestamp (logstash reference) to timestamp field
      # pnda.io documentation is wrong in this section.  We have to fix it since
      # pnda example generate a floating number instead of a long integer making
      # logstash to complain and exit with not matching schema error
      code => "event.set('timestamp', (event.get('@timestamp').to_f * 1000).to_i)"
    }
}
output {
    kafka {
      bootstrap_servers => "10.180.221.130:9092" # change the broker list IPs
      topic_id => "netflow_json"
      compression_type => "none" # "none", "gzip", "snappy", "lz4"
      value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer'
      codec => pnda-avro { schema_uri => "/home/ubuntu/logstash-5.3.3/pmacct.avsc" }
    }
}

Final avro schema

{  
  "namespace": "pnda.entity",
  "type": "record",
  "name": "event",
  "fields": [
     {"name": "timestamp", "type": "long"},
     {"name": "src",       "type": "string"},
     {"name": "host_ip",   "type": "string"},
     {"name": "rawdata",   "type": "bytes"}
  ]
}
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.