Hi,
I'm trying to read  from multiple jdbc inputs and aggregate the result which would be inserted into a field which is mapped to a nested type in elasticsearch. But the input stream is not executed sequentially. I did follow  this link and tagged each jdbc input and tried to agg conditionally, But the data inside the array is getting replaced instead of getting updated
input {
...
   jdbc {
      statement => "Select foo, table_a.val1, table_a.val2 ... FROM table_a ORDER BY foo;"
     tags => ["table_a"]
   }
  jdbc {
      statement => "Select foo, table_b.val1, table_b.val2 ... FROM table_b ORDER BY foo;"
      tags => ["table_b"]
   }
  jdbc {
      statement => "Select foo, table_c.val1, table_c.val2 ... FROM table_c ORDER BY foo;"
      tags => ["table_c"]
   }
........
}
filters {
   aggregate {
       task_id => "%{foo}"
        code => "
	 map['foo'] = event.get( 'foo')
		...
         map['arr'] ||= []
            
	
    if( event.get('tags').include? 'table_a' )
	    map['arr'] << {
                                       'val1' => event.get( 'table_a.val1'),
                                       'val2' => event.get( 'table_a.val2'),
                                       ...
                                     }
	elsif( event.get('tags').include? 'table_b' )
	     map['arr'] << {
                                       'val1' => event.get( 'table_b.val1'),
                                       'val2' => event.get( 'table_b.val2'),
                                       ...
                                     }
elsif( event.get('tags').include? 'table_c' )
	     map['arr'] << {
                                       'val1' => event.get( 'table_c.val1'),
                                       'val2' => event.get( 'table_c.val2'),
                                       ...
                                     }
             
	...
         event.cancel()
        "
        push_previous_map_as_event => true
}
output {
    elasticsearch {
            hosts => ["localhost:9200"]
             index => "test"
              codec => "json"
	      document_id => "%{foo}"
	      action => update
	      doc_as_upsert => true      
    }
}
Desired output in elasticsearch is something like,
{
  "id": "foo",
  "arr": [
{
  "val1": "table_a.val1",
  "val2": "table_a.val2"
},
{
  "val1": "table_b.val1",
  "val2": "table_b.val2"
},
{
  "val1": "table_c.val1",
  "val2": "table_c.val2"
}
...
  ]
}
Where "arr" being mapped to nested type in elasticsearch, But the actual output is based on the order of jdbc input execution I assume and the values in "arr" are being replaced where I might get values of just table_b or sometimes values of table_a and table_c and so on