Logstash ruby filter - put all event fields to dynamic name nested field

to keep it short, I will simplify things a bit.

Let's say I have two filebeats.
Each filebeat has json output codec and outputs completely different set of fields.
But, there is a rule - that messages can be paired based on some attribute between those two filebeats.

These two filebeats output to single kafka topic, where-from it reads logstash - and that's where I want to pair my documents.

Also, each filebeat outputs custom field "type", identifying type of the message.

Filebeat 1 output:
"type": "dog_source_filebeat01"
"does_like_ham": true
"likes_cat_with_id": 666

Filebeat 2 output:
"type": "cat_source_filebeat02"
"meows_often": false
"cat_id": 666

Now I want to process these with logstash and save it together (to single document) in ES in following format:
"_id": 666
"_source": {
"cat": { "type": "cat_source_filebeat02", "meows_often": false, "cat_id": 666 },
"dog": { "type": "dog_source_filebeat01", "does_like_ham": true", likes_cat_with_id": 666

So, my questions:

  • based on "type" field from filebeat, how can I put all fields in logstash event to nested document ("cat" or "dog") - ideally by ruby code, because there are many fields ? Can you write some example for me ?

Thanks for any response :slight_smile:

