Logstash splitting and tagging events

I need to ingest following json as separated events

{
"test1": {some nested json here},
"test2": {some nested json here},
"test3": {some nested json here},
"test4": {some nested json here}
}

I have 3 problems:

  • When i try to clone:

    filter {
    json {
    source => "message"
    }
    clone {
    clones => ['clone_for_test1', 'clone_for_test2']
    }
    if [type] == 'clone_for_test1' {
    prune {
    whitelist_names => [ "test1" ]
    }
    mutate {
    add_field => { "[@metadata][type]" => "test1" }
    }
    }
    }

I got json inside, but as nested json

{
   "test1": { ....
   }
}
  • Second one is with output: Now i can ingest with:

        tcp {
            codec => line { format => "%{test1}" }
            host => "127.0.0.1"
            port => 7515
            id => "TCP-SPLUNK-test1"
        }
    

I can do same for all cloned items, but i guess there is more clever way to do it.

  • Last one is question related to identifying events - something like:
    if format is { "test1":{},"test2":{},"test3":{},"test4":{} }
    then do something
    else do something different

I guess this should be done with grok, but I'll play whit that after manage to fix first 2 issues.

Do you want to discard "test1" and just keep the contents of "{some nested json here}" as the top-level fields in the event?

Yes, all json elements from test1 should go to root json.
General idea is to ingest content of nested jsons as separate events - maybe there is better idea ...

Source json:

{
"test1": {
"element1": "value1"
"element2": "value2"
"element3": "value3"
}
"test2": {
"element21": "value21"
"element22": "value22"
"element23": "value23"
}
"test3": {
"element31": "value31"
"element32": "value32"
"element33": "value33"
}
}

Expected output:
event 1
{
"element1": "value1"
"element2": "value2"
"element3": "value3"
}
event 2
{
"element21": "value21"
"element22": "value22"
"element23": "value23"
}
event 3
{
"element31": "value31"
"element32": "value32"
"element33": "value33"
}

You could use clone and mutate and so on, but I would do it in ruby

input { generator { count => 1 lines => [ '{ "test1": { "foo": 1, "bar": 8 }, "test2": { "foo": 2 }, "test3": { "foo": 3 }, "test4": { "foo": 4 } }' ] } }
filter {
    json { source => "message" target => "data" remove_field => [ "message" ] }
    ruby {
        code => '
            data = event.get("data")
            if data
                event.remove("data")

                data.each { |k, v|
                    newEvent = event.clone
                    v.each { |k, v|
                        newEvent.set(k, v)
                        new_event_block.call(newEvent)
                    }
                }
                event.cancel
            end
        '
    }

which gets me

{
"@timestamp" => 2020-11-24T21:02:25.876Z,
      "host" => "....",
       "foo" => 2,
  "sequence" => 0,
  "@version" => "1"
}
{
"@timestamp" => 2020-11-24T21:02:25.876Z,
       "foo" => 1,
  "sequence" => 0,
  "@version" => "1",
      "host" => "...",
       "bar" => 8
}

Problem here is unknown amount of elements inside nested json.

Why do you think that is a problem? My example demonstrates that it handles arbitrary elements in the nested JSON.

You're right, not skilled naf event to read ruby code.
Should check and adapt it a bit as not it's generated 3000+ lines output - somehow multiplied events ~15 times

What about identifying specific format ? Developers are migrating to this new format and for sometime I'll get mix old events (what is inside test1, test2 etc) as single events and new type of events, where I've this combination of nested jsons ?
if somehow i manage to tag new events, i can put this ruby filter.
if it's not, then process with old filters.

Of course that's just theory.

Hello @Badger,

As ruby solution is not acceptable for our support guys, i've requested developers to change format of logging to:
{
"result": [{
"field1": "valuex",
"field2": "valuey"

}, {
	"field123": "valuexyd",
	"field243": "valueyzd"
}, {
	"field444": "valueccy",
	"field234": "valueyzc"
}, {
	"field444": "valueccy",
	"field234": "valueyyc"
},
    ..........
   ,{
	"field4674": "valueddy",
	"field2134": "valuezyd"
}]

}

Idea is to generate X amount of single events, based on nested jsons.
Importunately when i use split it's generating separate events, but again inside nested json

{
  "result" : {
  		"field444": "valueccy",
		"field234": "valueyzc"
  }
 }

What i need is event like:

{
  		"field444": "valueccy",
		"field234": "valueyzc"
 }

My test filter is:

filter {

dissect {
        mapping => {
                "message" => "%{garbage}Info: %{msg}"
        }
}
 json {
source => "msg"
remove_field => [ "garbage", "message" ,"msg" ]
 }
 split {
field => "result"
id => "SPLIT"
remove_field => [ "msg" ]
 }
}

Thanks in advance for your support.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.