Append to nested type object from multiple jdbc input stream using logstash aggregation

Hi,
I'm trying to read from multiple jdbc inputs and aggregate the result which would be inserted into a field which is mapped to a nested type in elasticsearch. But the input stream is not executed sequentially. I did follow this link and tagged each jdbc input and tried to agg conditionally, But the data inside the array is getting replaced instead of getting updated

input {
...
   jdbc {
      statement => "Select foo, table_a.val1, table_a.val2 ... FROM table_a ORDER BY foo;"
     tags => ["table_a"]
   }
  jdbc {
      statement => "Select foo, table_b.val1, table_b.val2 ... FROM table_b ORDER BY foo;"
      tags => ["table_b"]
   }
  jdbc {
      statement => "Select foo, table_c.val1, table_c.val2 ... FROM table_c ORDER BY foo;"
      tags => ["table_c"]
   }

........
}

filters {
   aggregate {
       task_id => "%{foo}"
        code => "
	 map['foo'] = event.get( 'foo')
		...
         map['arr'] ||= []

            
	
    if( event.get('tags').include? 'table_a' )
	    map['arr'] << {
                                       'val1' => event.get( 'table_a.val1'),
                                       'val2' => event.get( 'table_a.val2'),
                                       ...
                                     }

	elsif( event.get('tags').include? 'table_b' )
	     map['arr'] << {
                                       'val1' => event.get( 'table_b.val1'),
                                       'val2' => event.get( 'table_b.val2'),
                                       ...
                                     }
elsif( event.get('tags').include? 'table_c' )
	     map['arr'] << {
                                       'val1' => event.get( 'table_c.val1'),
                                       'val2' => event.get( 'table_c.val2'),
                                       ...
                                     }
             
	...


         event.cancel()
        "
        push_previous_map_as_event => true
}

output {
    elasticsearch {
            hosts => ["localhost:9200"]
             index => "test"
              codec => "json"
	      document_id => "%{foo}"
	      action => update
	      doc_as_upsert => true      
    }
}

Desired output in elasticsearch is something like,

{
  "id": "foo",
  "arr": [
{
  "val1": "table_a.val1",
  "val2": "table_a.val2"
},
{
  "val1": "table_b.val1",
  "val2": "table_b.val2"
},
{
  "val1": "table_c.val1",
  "val2": "table_c.val2"
}
...
  ]
}

Where "arr" being mapped to nested type in elasticsearch, But the actual output is based on the order of jdbc input execution I assume and the values in "arr" are being replaced where I might get values of just table_b or sometimes values of table_a and table_c and so on

You cannot depend on the order of events from different inputs. You would get fewer errors if you used push_map_as_event_on_timeout with a sufficiently long timeout, but that could result in an excessive amount of data being retained in the map.

Thanks a lot for your reply...
It's working perfectly!! Only when the data associated with any given id ( 'foo' in the above example) is present in all the tables, So that i can order by that id and wait for some time but In my case there is no way for me to know how long to wait, For example lets say id 'foo1' - 'foo10000' is present in tableA and tableB but the first data in tableC starts from 'foo10001' so on reading, lets say 1000 records at a time, I will retrieve 'foo1-foo1000' from the tables A and B in the first iteration but the first 1000 records retrieved from tableC would be 'foo10001' - 'foo11001' so I might have to wait till the first 10000 records to be processed from tableA and B for the solution to work and currently I am losing data from table c as which is not an option.

Is there any other way to achieve the required output? or am I missing something here?

Not that I can think of. This really is not a use case logstash is designed to handle.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.