JSON array parsing in Logstash using the ruby filter

Hi All,

I have to parse json logs using Logstash. I have a file with my JSON data. I am using the following config and it works perfectly fine for most cases.

input {
  file {
    path => "/opt/logs/*.log"
    sincedb_path => "/dev/null"
    start_position => "beginning"
    codec => json
  }
}
output {
 elasticsearch {...}
}

The problems start to occur when one of the logs have an array of json data and it is parsed and visible in Kibana as follows:

{
  "port": "53",
  "address": "radeing.com",
  "protocol": "udp",
  "sname": "Trojan.Pony",
},
{
  "port": "80",
  "address": "radeing.com",
  "sid": "89041083",
  "type": "VmsigMatch",
  "sname": "Downloader.Win.Generic",
  "protocol": "tcp"
}

I also get the object not well supported error in Kibana.

image

The raw message that gives this info is as follows:

"cnc-services": {
          "cnc-service": [
            {
              "address": "radeing.com",
              "port": "53",
              "sname": "Trojan.Pony",
              "protocol": "udp"
            },
            {
              "type": "VmsigMatch",
              "address": "radeing.com",
              "url": "hxxp:///server/shit.exe",
              "protocol": "tcp",
              "port": "80",
              "sname": "Downloader.Win.Generic",
            }
        ]
}

The intended output is something as follows:

cnc-services.cnc-service.address: ["radeing.com", "google.com"]
cnc-services.cnc-service.port: ["53", "80"]

Could someone please help?

Do you mean in a JSON notation following structure you want to see:
"cnc-services.cnc-service.address": ["google.com","google.com"]?

Asking because "." in Kibana means something like: "cnc-services": {"cnc-service.address":["google.com","google.com"]}

If the first one you have to use join those two fields. You can accomplish that with using mutate filter:

mutate {
  add_field => {
    "new_field" => "%{cnc-services} %{cnc-service} %{address}
  }
  remove_field => ["cnc-services", "cnc-service", "address"]
}

Hi Charlie,

Thank you for the reply. As it is nested json, what you suggest will not work.

If you use the following code block:

mutate {
  add_field => {
    "new_field" => "%{[alert][explanation][cnc-services][cnc-service][address]}"
  }
}

You get something like this in the output:
image

The column on the right indicate the content of the alert.explanation.cnc-services.cnc-service object.

Also, this approach of hard-coding the mutate block is good if the fields are fixed in all log lines. But in our cases not all cnc-service objects will have the same fields.

This is getting parsed properly according to Logstash, but it is getting indexed as a nested object in Elasticsearch. As they are not well supported in Kibana, I hope to create a flat structured array using the ruby filter. Here is my config:

input {
  file {
    path => "/home/fire.log"
    sincedb_path => "/dev/null"
    start_position => "beginning"
    codec => json
  }
}
filter {
  ruby {
    code => "
      message_array = event.get('[alert][explanation][cnc-services][cnc-service]')
      if message_array.each_with_index { |item, index| event.set('cnc-service'+index.to_s, item) }
      end
    "
  }
  mutate {
    remove_field => ["[alert][explanation][os-changes]", "headers", "[alert][interface]", "alert", "appliance", "path", "product"]
  }
}
output {
  stdout { codec => rubydebug }
}

This gives me distinct cnc-service fields like [cnc-service1, cnc-service2, and so on] with a valid json object. I can definitely parse them using a json filter, but the number of cnc-service objects would vary and i do not wish to hard code it. Can I create a single hash object by merging them?

I am not familiar with ruby, in python I can easily do this as follows:

for k, v in chain(dict1.items(), dict2.items()):
    dict3[k].append(v)

Can I do something similar using the ruby filter?

If you don't care about the ordering of the array items (i.e. that element n of each array must originate from the same original hash) you can just loop over the array of hashes, loop over the key/value pairs of each hash, and add them to the array.

Wasn't quite aware of how to do that in ruby, so i ended up doing it in python. Is there a better way of exchanging data between ruby and python than writing it to a file?

    if [alert][explanation][cnc-services] {
      ruby {
        code => "
        cnc = event.get('[alert][explanation][cnc-services]')
        require 'open3'
        File.write('/etc/logstash/conf.d/scripts/ruby', cnc)
        cmd = 'python /path/to/script.py'
        stdin, stdout, stderr = Open3.popen3(cmd)
        python_out = File.read('/etc/logstash/conf.d/scripts/python')
        event.set('[alert][cnc_result]', python_out)
        "
        remove_field => ["[alert][explanation][cnc-services]"]
      }
      json {
        source => "[alert][cnc_result]"
        target => "[alert][explanation][cnc-services]"
        remove_field => ["[alert][cnc_result]"]
      }
    }

Is there a better way of exchanging data between ruby and python than writing it to a file?

You could pass a JSON string as an argument to the Python script. What you have above it not only inefficient, it won't work if you have more than one pipeline worker.

I tried that, but the argument length is too large and it throws an error.

This is the current arg length limit for me.

getconf ARG_MAX = 262144

Should I try increasing this on my Logstash machine? Sounds like it could be inefficient.

Oh, that much data. In that case write it to the process's stdin instead. That'll also solve the concurrency issue.

1 Like

I have changed this to write to stdin as suggested, and it seems to work perfectly fine.

ruby {
        code => "
        cnc = event.get('[alert][explanation][cnc-services]')
        require 'open3'
        cmd = 'python /etc/logstash/conf.d/scripts/fire.py'
        stdin, stdout, stderr, wait_thr = Open3.popen3(cmd)
        pid = wait_thr[:pid]
        stdin.write cnc
        stdin.close
        event.set('[alert][cnc_result]', stdout.read)
        stdout.close
        stderr.close
        exit_status = wait_thr.value
        "
        remove_field => ["[alert][explanation][cnc-services]"]
      }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.