I am trying to import relational DBMS data into ES by this filter(logstash-filter-aggregate), i am not sure it is should be used as this purpose.
In DB there are 3 tables, P parent, child c1, child c2
P -> c1 one to many
c1 -> c2 one to many
I want to import all data in those tables with good structure into ES. Is this plugin is the good for this purpose? If it is not, any other plugin or filter better ?
input {
jdbc {
jdbc_connection_string => "jdbc:oracle:thin:@host:1521:sid"
jdbc_user => "ora"
jdbc_password => "ora"
jdbc_driver_library => "/Users/xxx/Downloads/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
schedule => "* * * * *"
statement => "select p.id id, p.type, c1.id c1_id, c1.status,
c11.pid pid, c11.uuid, c11.key key, c11.value value
from parent p
left join child1 c1 on p.id = c1.p_id
left join child1_1 c11 on c11.p_id = c1.id
order by p.p_date desc"
}
}
filter {
aggregate {
task_id => "%{id}"
code => "map['count'] ||= 0;
map['id'] = event.get('id');
map['type'] = event.get('operation_type');
map['details'] ||= [];
map['details'] << {'operationId' => event.get('c1_id'),
'status' => event.get('status'),
'entities' => []
map['details']['entities'] << {
'proName' => event.get('key'),
'proValue' => event.get('value')
}
map['count'] += 1;
"
map_action => create_or_update
timeout => 1
push_map_as_event_on_timeout => true
timeout_tags => ['aggregated']
}
}
output {
if "aggregated" in [tags]{
stdout { codec => rubydebug }
elasticsearch {
"hosts" => "localhost:9200"
"index" => "op11"
"document_type" => "data"
"user" => "elastic"
"password" => "Welcome1"
}
}
}