JSON array parsing in Logstash using the ruby filter


(Nachiket) #1

Hi All,

I have to parse json logs using Logstash. I have a file with my JSON data. I am using the following config and it works perfectly fine for most cases.

input {
  file {
    path => "/opt/logs/*.log"
    sincedb_path => "/dev/null"
    start_position => "beginning"
    codec => json
  }
}
output {
 elasticsearch {...}
}

The problems start to occur when one of the logs have an array of json data and it is parsed and visible in Kibana as follows:

{
  "port": "53",
  "address": "radeing.com",
  "protocol": "udp",
  "sname": "Trojan.Pony",
},
{
  "port": "80",
  "address": "radeing.com",
  "sid": "89041083",
  "type": "VmsigMatch",
  "sname": "Downloader.Win.Generic",
  "protocol": "tcp"
}

I also get the object not well supported error in Kibana.

image

The raw message that gives this info is as follows:

"cnc-services": {
          "cnc-service": [
            {
              "address": "radeing.com",
              "port": "53",
              "sname": "Trojan.Pony",
              "protocol": "udp"
            },
            {
              "type": "VmsigMatch",
              "address": "radeing.com",
              "url": "hxxp:///server/shit.exe",
              "protocol": "tcp",
              "port": "80",
              "sname": "Downloader.Win.Generic",
            }
        ]
}

The intended output is something as follows:

cnc-services.cnc-service.address: ["radeing.com", "google.com"]
cnc-services.cnc-service.port: ["53", "80"]

Could someone please help?


(Charlie) #2

Do you mean in a JSON notation following structure you want to see:
"cnc-services.cnc-service.address": ["google.com","google.com"]?

Asking because "." in Kibana means something like: "cnc-services": {"cnc-service.address":["google.com","google.com"]}


(Charlie) #3

If the first one you have to use join those two fields. You can accomplish that with using mutate filter:

mutate {
  add_field => {
    "new_field" => "%{cnc-services} %{cnc-service} %{address}
  }
  remove_field => ["cnc-services", "cnc-service", "address"]
}

(Nachiket) #4

Hi Charlie,

Thank you for the reply. As it is nested json, what you suggest will not work.

If you use the following code block:

mutate {
  add_field => {
    "new_field" => "%{[alert][explanation][cnc-services][cnc-service][address]}"
  }
}

You get something like this in the output:
image

The column on the right indicate the content of the alert.explanation.cnc-services.cnc-service object.

Also, this approach of hard-coding the mutate block is good if the fields are fixed in all log lines. But in our cases not all cnc-service objects will have the same fields.


(Nachiket) #5

This is getting parsed properly according to Logstash, but it is getting indexed as a nested object in Elasticsearch. As they are not well supported in Kibana, I hope to create a flat structured array using the ruby filter. Here is my config:

input {
  file {
    path => "/home/fire.log"
    sincedb_path => "/dev/null"
    start_position => "beginning"
    codec => json
  }
}
filter {
  ruby {
    code => "
      message_array = event.get('[alert][explanation][cnc-services][cnc-service]')
      if message_array.each_with_index { |item, index| event.set('cnc-service'+index.to_s, item) }
      end
    "
  }
  mutate {
    remove_field => ["[alert][explanation][os-changes]", "headers", "[alert][interface]", "alert", "appliance", "path", "product"]
  }
}
output {
  stdout { codec => rubydebug }
}

This gives me distinct cnc-service fields like [cnc-service1, cnc-service2, and so on] with a valid json object. I can definitely parse them using a json filter, but the number of cnc-service objects would vary and i do not wish to hard code it. Can I create a single hash object by merging them?

I am not familiar with ruby, in python I can easily do this as follows:

for k, v in chain(dict1.items(), dict2.items()):
    dict3[k].append(v)

Can I do something similar using the ruby filter?


JSON Parse an already parsed field
(Magnus Bäck) #6

If you don't care about the ordering of the array items (i.e. that element n of each array must originate from the same original hash) you can just loop over the array of hashes, loop over the key/value pairs of each hash, and add them to the array.


(Nachiket) #7

Wasn't quite aware of how to do that in ruby, so i ended up doing it in python. Is there a better way of exchanging data between ruby and python than writing it to a file?

    if [alert][explanation][cnc-services] {
      ruby {
        code => "
        cnc = event.get('[alert][explanation][cnc-services]')
        require 'open3'
        File.write('/etc/logstash/conf.d/scripts/ruby', cnc)
        cmd = 'python /path/to/script.py'
        stdin, stdout, stderr = Open3.popen3(cmd)
        python_out = File.read('/etc/logstash/conf.d/scripts/python')
        event.set('[alert][cnc_result]', python_out)
        "
        remove_field => ["[alert][explanation][cnc-services]"]
      }
      json {
        source => "[alert][cnc_result]"
        target => "[alert][explanation][cnc-services]"
        remove_field => ["[alert][cnc_result]"]
      }
    }

(Magnus Bäck) #8

Is there a better way of exchanging data between ruby and python than writing it to a file?

You could pass a JSON string as an argument to the Python script. What you have above it not only inefficient, it won't work if you have more than one pipeline worker.


(Nachiket) #9

I tried that, but the argument length is too large and it throws an error.

This is the current arg length limit for me.

getconf ARG_MAX = 262144

Should I try increasing this on my Logstash machine? Sounds like it could be inefficient.


(Magnus Bäck) #10

Oh, that much data. In that case write it to the process's stdin instead. That'll also solve the concurrency issue.


(Nachiket) #11

I have changed this to write to stdin as suggested, and it seems to work perfectly fine.

ruby {
        code => "
        cnc = event.get('[alert][explanation][cnc-services]')
        require 'open3'
        cmd = 'python /etc/logstash/conf.d/scripts/fire.py'
        stdin, stdout, stderr, wait_thr = Open3.popen3(cmd)
        pid = wait_thr[:pid]
        stdin.write cnc
        stdin.close
        event.set('[alert][cnc_result]', stdout.read)
        stdout.close
        stderr.close
        exit_status = wait_thr.value
        "
        remove_field => ["[alert][explanation][cnc-services]"]
      }

(system) #12

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.