Logstash - Nested fields


(Nachiket) #1

Hi,

I wish to parse proxy logs using logstash. I have multiple IP fields in each document (log). I wish to parse the IP field as an object/array, not quite sure which is best suited for the requirement. The final output that i wish to achieve for each document would be something similar to as follows

logs/proxy/1
    { 
      "device": "bluecoat",
      "category": "basic internet"
       "IP": { 
          "device": "10.0.0.1",
          "source":  "10.4.4.1"
          "destination": "104.49.44.1"
        }     
    }

How do i create this object or array type (IP) in elasticsearch using logstash? Currently i have 3 ip fields named as follows:

deviceip:"10.0.0.1"
srcip:"10.4.4.1"
dstip:"104.49.44.1"

The purpose of grouping these 3 fields is to query the IP field for any matches and retrieve relevant results. I am not sure if this falls under the Parent-Child type of a situation. I believe if this does fall under the Parent-Child scenario then achieving this directly in logstash would be difficult. Any help would be appreciated.

These are the references I have gone through:

https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/object.html
Convert a field datatype to object or nested in logstash
Define Parent-Child Relationship in Logstash


(Nachiket) #2

This is the config that i have in place. All the values are currently parsed, how do i insert in Elasticsearch either as an object or an array?

input {
...
}
filter {
if "getproxy" in [tags] {
### First two grok filters, filter out logs with Users present in the log source
        grok {
                patterns_dir => ["/etc/logstash/patterns"]
                match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{IP:srcip} %{PROXYBASE:base} %{OUBASE:oubase} %{GREEDYDATA:msg}" }
                add_tag => [ "User_Present" ]
                remove_tag => ["getproxy"]
                tag_on_failure => [ "unparsed" ]
        }
        grok {
                patterns_dir => ["/etc/logstash/patterns"]
                match => { "oubase" => "%{OU:ou}com\/%{GREEDYDATA:suser}" }
                tag_on_failure => [ "unparsed" ]
### Remove redundant field names
                add_field =>  {"user.source" => "%{suser}"}
                remove_field => ["suser"]
                remove_field => ["oubase"]
        }
### In Case user is not present, this grok will consume the information
        if "unparsed" in [tags] {
                grok {
                        patterns_dir => ["/etc/logstash/patterns"]
                        match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{IP:srcip} %{PROXYBASE:base} %{GREEDYDATA:msg}" }
                        add_tag => ["User_Absent"]
                        remove_tag => ["unparsed", "getproxy"]
                        tag_on_failure => ["unparsed"]
                }
        }
### Create kv pairing for logs
        if "unparsed" not in [tags] {
                kv {
                        source => "msg"
                        value_split => "="
                        target => "msg"
                }
                kv {
                        source => "base"
                        value_split => "="
                        target => "base"
                }
### Mutate the fields to get only the required fields
                mutate {
                                add_field =>  {"ip.device" => "%{srcip}"}
                                remove_field => ["srcip"]
                                remove_field => ["product", "product_version", "user"]
                }

        }

        date {
                match => [ "timestamp", "MMM dd HH:mm:ss" ]
        }
}
}
output {
if "proxy" in [type] {
    stdout {
            codec => "rubydebug"
    }
    elasticsearch {
    action => "index"
    hosts => ["http://localhost:9200"]
    index => "proxy"
    }
}
}

Any suggestions to improve the config are welcome as well.


(Nachiket) #3

I think what i want to achieve is a Nested datatype.
Alright, now that I have realized my mistake, let me rephrase it. How do i create nested documents in Elasticssearch using Logstash?

I realize that you might need to use the Ruby filter. Any pointers or sample filter that I can refer? Not very familiar with Ruby.

Here are some links I found:


(Magnus Bäck) #4

To achieve the desired nested fields just rename the existing fields into place with a mutate filter.

mutate {
  rename => {
    "deviceip" => "[IP][device]"
    "srcip" => "[IP][source]"
    "dstip" => "[IP][destination]"
  }
}

(Nachiket) #5

Hi Magnus,

Thank you for the reply. I was hoping that you would eventually land here and reply! :stuck_out_tongue:

This works well for the srcip field. But there is an issue, the device ip and destination ip are already part of a nested field msg. The structure is as follows:

msg {
    deviceip: 10.0.0.1
    destip: 10.0.0.2
}

How do i use the rename filter in this scenario? I tried using the rename option of mutate, but it threw an exception error.

So, to a temporarily have a work around this issue I sent all the fields to the root of the message. Then using the rename option on the msg.deviceip and msg,dstip i tried using the rename filter. However, that gave an array that was not quite supported by Elasticsearch. I then even tried using the add_field option, but the document structure remained the same. Here is the structure:

"ip": [
  [
    "device",
    "10.131.73.14"
  ],
  "ip"
],

Is there something that i am currently doing wrong? Please help.

Also, purely for an academic purpose, how would i achieve if i have to work with the nested fields directly? is it possible to rename nested fields in such a way?

[msg][deviceip] => [ip][device]

Thanks & Regards,
N


(Magnus Bäck) #6

Also, purely for an academic purpose, how would i achieve if i have to work with the nested fields directly? is it possible to rename nested fields in such a way?

Yes, that's exactly what it should look like.


(Nachiket) #7

Hi magnus,

I understand, it should look like this, however, 2 ip fields are missing.

It should ideally look like this:

"ip": [
  [
    "device",
    "10.131.73.14"
  ],
  [
    "source", 
    "10.x.x.x"
  ]
   And one more ip field  
]

This is not visible, and i get a truncated output.


(Magnus Bäck) #8

I don't understand how the result could look like that. Please show

  • an example event after Logstash has processed it without the field rename, and
  • an example event after Logstash has processed it with the field rename.

(Nachiket) #9

Hi Magnus,

I resolved the issue. It was a stupid mistake on my end.

I did a add_field option before the rename and totally forgot about it.

add_field { 
"ip" => "ip"
}

This created an object array, and then i ran into all kinds of issues, on removing that line and using the config you suggested, this is the output i receive:

"ip": {
  "destination": "49.44.50.18",
  "source": "10.44.111.240",
  "device": "10.131.73.14"
}

Sorry, for such a silly error. Thanks a lot for your help!

Regards,
N


(system) #10

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.