Hi,
I'm trying to read from multiple jdbc inputs and aggregate the result which would be inserted into a field which is mapped to a nested type in elasticsearch. But the input stream is not executed sequentially. I did follow this link and tagged each jdbc input and tried to agg conditionally, But the data inside the array is getting replaced instead of getting updated
input {
...
jdbc {
statement => "Select foo, table_a.val1, table_a.val2 ... FROM table_a ORDER BY foo;"
tags => ["table_a"]
}
jdbc {
statement => "Select foo, table_b.val1, table_b.val2 ... FROM table_b ORDER BY foo;"
tags => ["table_b"]
}
jdbc {
statement => "Select foo, table_c.val1, table_c.val2 ... FROM table_c ORDER BY foo;"
tags => ["table_c"]
}
........
}
filters {
aggregate {
task_id => "%{foo}"
code => "
map['foo'] = event.get( 'foo')
...
map['arr'] ||= []
if( event.get('tags').include? 'table_a' )
map['arr'] << {
'val1' => event.get( 'table_a.val1'),
'val2' => event.get( 'table_a.val2'),
...
}
elsif( event.get('tags').include? 'table_b' )
map['arr'] << {
'val1' => event.get( 'table_b.val1'),
'val2' => event.get( 'table_b.val2'),
...
}
elsif( event.get('tags').include? 'table_c' )
map['arr'] << {
'val1' => event.get( 'table_c.val1'),
'val2' => event.get( 'table_c.val2'),
...
}
...
event.cancel()
"
push_previous_map_as_event => true
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "test"
codec => "json"
document_id => "%{foo}"
action => update
doc_as_upsert => true
}
}
Desired output in elasticsearch is something like,
{
"id": "foo",
"arr": [
{
"val1": "table_a.val1",
"val2": "table_a.val2"
},
{
"val1": "table_b.val1",
"val2": "table_b.val2"
},
{
"val1": "table_c.val1",
"val2": "table_c.val2"
}
...
]
}
Where "arr" being mapped to nested type in elasticsearch, But the actual output is based on the order of jdbc input execution I assume and the values in "arr" are being replaced where I might get values of just table_b or sometimes values of table_a and table_c and so on