Aggregate multiple nested (recursive) logstash

I am using logstash with input jdbc, and would like to embed one object inside another with aggregate. How can I use add recursive?

Ie add an object inside another object?

My jdbc input would return output similar (as an example) to the following image:

This would be an example of how I want its output to be in elasticsearch. I describe the example for a single document, this should apply to the rest of the input data (id = 2, id = 3, etc):

{
  "_index": "my-index",
  "_type": "test",
  "_id": "1",
  "_version": 1,
  "_score": 1,
  "_source": {
    "id": "1",
    "properties": {
      "id": "1",
      "description": "Texto 1",
      "Detail": [
        {
          "id_2": "1",
          "cod": "A",
          "descr": "Detail A",
          "SubDetail": [
            {
              "id_3": "1",
              "cod": "X1",
              "descr": "Sub Detail X1"
            }
          ]
        },
        {
          "id_2": "2",
          "cod": "B",
          "descr": "Detail B",
          "SubDetail": [
            {
              "id_3": "1",
              "cod": "X1",
              "descr": "Sub Detail X1"
            },
            {
              "id_3": "2",
              "cod": "X2",
              "descr": "Sub Detail X2"
            },
            {
              "id_3": "3",
              "cod": "X3",
              "descr": "Sub Detail X3"
            }
          ]
        }
      ]
    }
  }
}

In the filter section, I am using something like this but it is not working for me:

aggregate {
  task_id => "%{id}"
  code => "
      map['id'] = event.get('id')
	  map['description'] = event.get('description')
      
      map['detail_list'] ||= []
      map['Detail'] ||= []
      if (event.get('id_2') != nil)
        if !( map['detail_list'].include?event.get('id_2') ) 
          map['detail_list'] << event.get('id_2') 
          map['Detail'] << {
            'id_2' => event.get('id_2'),                             
            'cod' => event.get('cod_2'),
            'descr' => event.get('descr_2'),
            
             map['sub_detail_list'] ||= []
              map['subDetail'] ||= []
              if (event.get('id_3') != nil)
                if !( map['sub_detail_list'].include?event.get('id_3') ) 
                  map['sub_detail_list'] << event.get('id_3')         
                  map['subDetail'] << {
                    'id_3' => event.get('id_3'),                             
					'cod' => event.get('cod_3'),
					'descr' => event.get('descr_3')
                  }
                end
              end
          }
        end
      end
       
      event.cancel()
  "
  push_previous_map_as_event => true
  timeout => 3

} 

Any ideas or suggestions on how to implement something like this? I also clarify that I have configured pipeline.worker = 1.

Is that the input or the output? Whichever one is not present, please add that as well.

Thank you very much for replying Badger, I appreciate your time.
Edit the post a bit to make it clearer.
Add the example output for the input, the example of the elastic structure that I want to achieve and lastly the filter that I think could be used.

I do not see any connection between the input and the output data.

I do not understand what you mean by the connection between the input and output, as I said is by way of example.
The query is: is it possible to nest two objects with aggregate?
From an id_1, generate an object grouping by id_2 which in turn will contain another grouping by id_3?

You provided an image of some data that your jdbc input is returning, and an example of the data that you want your filters to produce, but the two sets of data do not appear to be related.

Well! now if I modify the whole Post so that there is no confusion between the data input, what I expect from the output and how it happened to do it. I hope you can help me.

You want to take the four rows

1,1,1,text1,A,Detail A,X1,Detail X1
1,2,1,text1,B,Detail B,X1,Detail X1
1,2,2,text1,B,Detail B,X2,Detail X2
1,2,3,text1,B,Detail B,X3,Detail X3

and create a event that contains an array of hashes, the first of which contains data from the first line, where the second column is 1, and the second of which contains data from the next three rows, where the second column is 2.

Your problem is that you need to insert an entry into an array, but you do not know which one. Is it the SubDetail array you need to add an entry to inside the first hash in the Detail array or is it the second one?

You want to get to (ignoring the cod and description fields)

{
"id": "1",
"properties": {
  "id": "1",
  "Detail": [
    {
      "id_2": "1",
      "SubDetail": [
        { "id_3": "1" }
      ]
    },
    {
      "id_2": "2",
      "SubDetail": [
        { "id_3": "1" },
        { "id_3": "2" },
        { "id_3": "3" }
      ]
    }
  ]
}
}

In order to get there you are going to have to go make Detail a hash where the key is the value of id_2. You can convert it to an array in later ruby filter.

{
"id": "1",
"properties": {
  "id": "1",
  "Detail": {
    "1" {
      "id_2": "1",
      "SubDetail": [
        { "id_3": "1" }
      ]
    },
    "2" {
      "id_2": "2",
      "SubDetail": [
        { "id_3": "1" },
        { "id_3": "2" },
        { "id_3": "3" }
      ]
    }
  ]
}
}

That structure allows you to use

map['properties']['Detail'][id_2]['SubDetail'] << { ... }

In the subsequent ruby filter just do

newD = []
event.get('[properties][Detail]').each { |k, v |
    newD << v
}
event.set('[properties][Detail]', newD)

Thank you very much for answering, answering your questions, yes, what I am trying to do is insert the subDetail array into the Detail array for each new document (id).
I understand what your idea is but I can't make it work, I tried several things similar to this but I didn't achieve the goal, what am I doing wrong?

    aggregate {
        task_id => "%{id}"
        code => "
            map['properties']['id'] = event.get('id')
            map['properties']['descripcion'] = event.get('description')            
                    
            map['properties']['detail_list'] ||= []
            map['properties']['Detail'] ||= []
            if (event.get('id_2') != nil) 
                if !( map['properties']['detail_list'].include?event.get('id_2') )
                        map['properties']['detail_list'] << event.get('id_2')                
                        map['properties']['Detail'] << {
                            'id_2' => event.get('id_2'),
                            'cod_2' => event.get('cod_2'),
                            'descr_2' => event.get('descr_2')                           
                        }
                        map['properties']['Detail'][id_2]['SubDetail'] << {
                                'id_3' => event.get('id_3'),
                                'cod_3' => event.get('cod_3'),
                                'descr_3' => event.get('descr_2')                            
                        }
                        
                end
            end 
            event.cancel()
        "
        push_previous_map_as_event => true
        timeout => 5
    } 

        ruby {
        code => "    
            newD = []
            event.get('[properties][Detail]').each { |k, v |
                newD << v
            }
            event.set('[properties][Detail]', newD)
        "
    }

You could try using

    aggregate {
        task_id => "%{id}"
        code => '
            map["properties"] ||= {}
            map["properties"]["id"] = event.get("id")
            map["properties"]["description"] = event.get("description")

            map["properties"]["detail_list"] ||= []
            map["properties"]["Detail"] ||= {}
            id_2 = event.get("id_2")
            if (id_2 != nil)
                if !( map["properties"]["detail_list"].include? id_2 )
                    map["properties"]["detail_list"] << id_2
                    map["properties"]["Detail"][id_2] = {
                        "id_2" => event.get("id_2"),
                        "cod" => event.get("cod_2"),
                        "descr" => event.get("descr_2")
                    }
                end
            end

            map["properties"]["Detail"][id_2]["sub_detail_list"] ||= []
            map["properties"]["Detail"][id_2]["subDetail"] ||= []
            id_3 = event.get("id_3")
            if (id_3 != nil)
                if !( map["properties"]["Detail"][id_2]["sub_detail_list"].include? id_3 )
                    map["properties"]["Detail"][id_2]["sub_detail_list"] << id_3
                    map["properties"]["Detail"][id_2]["subDetail"] << {
                    "id_3" => event.get("id_3"),
                    "cod" => event.get("cod_3"),
                    "descr" => event.get("descr_3")
                    }
                end
            end

            event.cancel()
        '

I use single quotes around the code and double quotes inside it so that I can add debugging statements like

puts "#{id_2} #{id_3}"

Which is what told me inserting sub_detail should not be inside the if (id_2 != nil) test. That code will get you an event like

"properties" => {
         "Detail" => {
        "1" => {
            "sub_detail_list" => [
                [0] "1"
            ],
                      "descr" => "Detail A",
                        "cod" => "A",
                  "subDetail" => [
                [0] {
                    "descr" => "Detail X1",
                     "id_3" => "1",
                      "cod" => "X1"
                }
            ],
                       "id_2" => "1"
        },
        "2" => {
            "sub_detail_list" => [
                [0] "1",
                [1] "2",
                [2] "3"
            ],
                      "descr" => "Detail B",
                        "cod" => "B",
                  "subDetail" => [
                [0] {
                    "descr" => "Detail X1",
                     "id_3" => "1",
                      "cod" => "X1"
                },
                [1] {
                    "descr" => "Detail X2",
                     "id_3" => "2",
                      "cod" => "X2"
                },
                [2] {
                    "descr" => "Detail X3",
                     "id_3" => "3",
                      "cod" => "X3"
                }
            ],
                       "id_2" => "2"
        }
    },
    "description" => "text1",
    "detail_list" => [
        [0] "1",
        [1] "2"
    ],
             "id" => "1"
}

That may not be exactly what you want but should get you much closer.

1 Like

"BADGER", just thank you! It works just as you wanted! I have searched many forums and blogs for help but have never seen a way to do it. I will continue investigating and adjusting things but it works perfect!
Thank you very much and have a good day.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.