Logstash Filter - Breaking up fields into multiple documents

Hi Logstash Community,

I am trying to break up a single input with multiple fields into multiple output documents in elasticsearch with a subsection of these fields contained, all using the same set of field names.

Background:
I am using the SNMP input plugin in logstash to monitor the health of a F5 Load Balancer (not important).

I am "walking" on an OID in the tree which holds an unknown number of "groups" of objects in it.
Each "group" has a number of values (below example is 3) which will are separated by a starting number and then a sequence of trailing numbers denoting the object.

Example Output at Present:
1.12.84 = "Name 1"
2.12.84 = 1
3.12.84 = "String 1"
1.13.47 = "Name 2"
2.13.47 = 2
3.13.47 = "String 2"
1.17.67. = "Name 3"
2.17.67 = 2
3.17.67 = "String 3"
.etc

Desired Output:
I would like to split up this single input into a number of outputs with the same field names based on each "group" of inputs (i.e. set of numbers following the initial 1./2. or 3.):
i.e.
document 1:
{
title = "Name 1"
value = 1
status = "String 1"
}
document 2:
{
title = "Name 2"
value = 2
status = "String 2"
}
document 3:
{
title = "Name 3"
value = 3
status = "String 3"
}
document ...

Does anyone know how one would:

  • Split a single input into multiple output documents in the same index,
  • Programmatically break up a group of fields and rename them for each output document.

All help greatly appreciated,
S

If you use

output { stdout { codec => rubydebug } }

what does the structure of a single document look like? Your description and sample data do not match at all. Did you mean it is something like

{
"1.12.84": "Name 1",
"2.12.84": "Name 2",
"3.12.84": "Name 3",
"1.13.47": 1,
"2.13.47": 2,
"3.13.47": 3,
"1.17.67": "String 1",
"2.17.67": "String 2",
"3.17.67": "String 3"
}

If it does look anything like that you could use a ruby filter to reformat it

    json { source => "message" target => "[@metadata][snmpData]" remove_field => [ "message" ] }
    ruby {
        code => '
            keys = { "12.84" => "title", "13.47" => "value", "17.67" => "status" }
            snmpData = event.get("[@metadata][snmpData]")
            if snmpData.is_a? Hash
                h = {}
                snmpData.each { |k, v|
                    match = k.match(/(\d)\.(.*)/)
                    instance = match[1]
                    oid = match[2]
                    if ! h.key? instance
                        h[instance] = {}
                    end
                    if keys.key? oid
                        oid = keys[oid]
                    end
                    h[instance][oid] = v
                }
            event.set("snmpData", h.values)
            end
        '
    }

At the end of the loop h will look like

{
"1"=>{"title"=>"Name 1", "value"=>1, "status"=>"String 1"}, 
"2"=>{"title"=>"Name 2", "status"=>"String 2", "value"=>2}, 
"3"=>{"title"=>"Name 3", "value"=>3, "status"=>"String 3"}
}

Calling h.values discards the keys and converts that to an array

[
{"title"=>"Name 1", "value"=>1, "status"=>"String 1"}, 
{"title"=>"Name 2", "status"=>"String 2", "value"=>2}, 
{"title"=>"Name 3", "value"=>3, "status"=>"String 3"}
]

You can then use

split { field => "snmpData" }

to end up with events like

{
"@timestamp" => 2020-05-08T22:53:04.180Z,
  "snmpData" => {
     "value" => 3,
     "title" => "Name 3",
    "status" => "String 3"
}, ...

If you want to move the fields to the top level then see here.

Hi Badger, thanks for helping me with this one.

Not quite the same interpretation of the input file but the idea of the output is perfect (i'll most likely use the fields at the top level).
Please see below the stdout before filters are applied:

{
"1.12.47.67.111.109.109.111.110.47.84.69.83.84" => "/Common/TEST",
"2.12.47.67.111.109.109.111.110.47.84.69.83.84" => 3,
"3.12.47.67.111.109.109.111.110.47.84.69.83.84" => 1,
"4.12.47.67.111.109.109.111.110.47.84.69.83.84" => 0,
"5.12.47.67.111.109.109.111.110.47.84.69.83.84" => "The children pool member(s) are down",

"1.13.47.67.111.109.109.111.110.47.84.69.83.84.50" => "/Common/TEST2",
"2.13.47.67.111.109.109.111.110.47.84.69.83.84.50" => 4,
"3.13.47.67.111.109.109.111.110.47.84.69.83.84.50" => 1,
"4.13.47.67.111.109.109.111.110.47.84.69.83.84.50" => 0,
"5.13.47.67.111.109.109.111.110.47.84.69.83.84.50" => "The children pool member(s) are not available yet",

"1.17.47.67.111.109.109.111.110.47.77.65.82.83.45.84.69.83.84" => "/Common/TEST3",
"2.17.47.67.111.109.109.111.110.47.77.65.82.83.45.84.69.83.84" => 4,
"3.17.47.67.111.109.109.111.110.47.77.65.82.83.45.84.69.83.84" => 2,
"4.17.47.67.111.109.109.111.110.47.77.65.82.83.45.84.69.83.84" => 0,
"5.17.47.67.111.109.109.111.110.47.77.65.82.83.45.84.69.83.84" => "The children pool member(s) are not available yet",

"@metadata" => {
         "host_protocol" => "udp",
          "host_address" => "WW.XX.YY.ZZ",
          "host_community" => "AAAA",
          "host_port" => "161"
    },
"@timestamp" => 2020-05-10T15:55:52.442Z,
"@version" => "1",
"tags" => [ [0] "F5_SNMP_VirtualServers"],
"host" => "WW.XX.YY.ZZ"
}

The input above can be seen as 3 groups of 5 entries.
i.e. 12.47.67.111.109.109.111.110.47.84.69.83.84 = one "object" / "group id" and there are 5 pieces of information about that object (1.,2.,3.,4.,5.).

The number of "objects" (i.e. the long string) will be varying but the number of "pieces of information" about each object will be fixed (i.e the prefix number 1. -> 5.).

I adjusted the "keys" in the previously shared ruby to reflect the 1. -> 5,
( i.e. keys = { "1." => "title", "2." => "value1", "3." => "value2", "4." => "value3", "5." => "status" }
but I can't see any "snmpData" in the output.

I added puts event.get("snmpData") into the code block to see what "snmpData" was and it is coming back as null. I then commented out the json line and printed out "message" and that seems to be null as well.

Do you know how I would recreate the "snmpData" field without the "message" field?

Assuming we can re-use the ruby block, does the match = k.match(/(\d)\.(.*)/) have to be adjusted as I am grouping based on the "oid" rather than the "instance" (using your variable names) or will this just split up the field name based on the first decimal character and the rest of the string?

Thanks again for your help!

Yes.

You have all your fields at the top level, so you do not have an snmpData field. The reason I set the target option on the json filter was so that all the fields would be inside another field, and when iterating over the contents of the hash I wouldn't have to do something like

if ! ["@timestamp", "@version", "tags", "host"].include? k...

to ignore those fields.

I think I know what you mean around the fields not showing up at the top level but I don't know how to resolve it.
If they are at the top level, can I still add all fields starting with 1. -> 5. to a "snmpData" structure or would I have to take all of the message field and strip out @timestamp, @version, tags and host out?
If so how would I do that?

What I have at the moment is the below but I'm still not seeing any data arrays or fields of "title", "valueX" etc.:

filter {

 if "F5_SNMP_VirtualServers" in [tags] {

    # Create a field called "snmpData" to store data being passed from "message"
    mutate {
        add_field => { "snmpData" => "" }
    }

    # Pass values from "message" field to "snmpData"
    json { source => "message" target => "[@metadata][snmpData]" remove_field => [ "message" ] }

    ruby {
        code => '
            keys = { "1." => "title", "2." => "value1", "3." => "value2", "4." => "value3", "5." => "status" }
            snmpData = event.get("[@metadata][snmpData]") # Pull in data into "snmpData" in the code block
            if snmpData.is_a? Hash
                h = {}
                snmpData.each { |k, v|
                    match = k.match(/(\d)\.(.*)/)
                    instance = match[1]
                    oid = match[2]
                    if ! h.key? instance
                        h[instance] = {}
                    end
                    if keys.key? oid
                        oid = keys[oid]
                    end
                    h[instance][oid] = v
                }
            event.set("snmpData", h.values)

            end
        '
    }
    # Split the "snmpData" into multiple events
    split { field => "snmpData" }

    # Bring fields from the "snmpData" to the top level
    ruby {
    code => '
            # cycle through "snmpData" and bring the fields to the top level
            event.get("snmpData").each { |k, v|
                event.set(k,v)
            }
            event.remove("snmpData")
    '
    }
    #
  }