Update array of object with jdbc

Im facing a problem. Im using jdbc to synchronise elasticsearch with mariadb mysql.
Im building a searching system with elasticsearch for my company for the intranet and the ticket.
With the actual app we can reply to those ticket and all the reply is save in table name ticket_msg.
I know in the filter i can do jdbc stream and fetch data.
But, i want all the reply to be save in an array of objects into the parent ticket. I have a statement that check if theres a new reply by checking the last id and the update_timestamp. And in the ticket_msg table i have a column called ticket_id. When a new reply is added or reply is modified, i want it to add or update this reply in the ticket doc.
In short: I want all reply to be added to the right ticket with the field called ticket_id and push the reply in an array in the ticket.
My File:
input {
jdbc {
...
jdbc_driver_library => "C:/mysql-connector-java-8.0.14/mysql-connector-java-8.0.14.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
statement => "SELECT intranet_doc., staff.first_name, staff.last_name, intranet_doc_paths.description FROM intranet_doc LEFT JOIN staff ON staff.id = intranet_doc.account_id LEFT JOIN intranet_doc_paths ON intranet_doc_paths.id = intranet_doc.path_id WHERE intranet_doc.id > :sql_last_value"
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
clean_run => true
type => "intranet"
}
jdbc {
...
statement => "SELECT intranet_doc.
, staff.first_name, staff.last_name, intranet_doc_paths.description FROM intranet_doc LEFT JOIN staff ON staff.id = intranet_doc.account_id LEFT JOIN intranet_doc_paths ON intranet_doc_paths.id = intranet_doc.path_id WHERE intranet_doc.update_timestamp > :sql_last_value"
use_column_value => true
tracking_column => "update_timestamp"
tracking_column_type => "timestamp"
type => "intranet"
clean_run => true
}
jdbc {
...
jdbc_driver_library => "C:/mysql-connector-java-8.0.14/mysql-connector-java-8.0.14.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
statement => "SELECT ticket., assign.first_name as 'assign_fn', assign.last_name as 'assign_ln', openby.first_name as 'openby_fn', openby.last_name as 'openby_ln', department.name as 'department', account.customer_id, account.first_name as customer_fn, account.last_name as customer_ln, account.company as customer_cp FROM ticket INNER JOIN staff as assign ON assign.id = ticket.assign_to INNER JOIN staff as openby ON openby.id = ticket.open_by INNER JOIN ticket_dept as department ON department.id = ticket.dept_id INNER JOIN account ON account.id = ticket.account_id INNER JOIN account as customer ON customer.id = ticket.account_id WHERE ticket.id > :sql_last_value"
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
clean_run => true
type => "ticket"
}
jdbc {
...
statement => "SELECT ticket.
, assign.first_name as 'assign_fn', assign.last_name as 'assign_ln', openby.first_name as 'openby_fn', openby.last_name as 'openby_ln', department.name as 'department', account.customer_id, account.first_name as customer_fn, account.last_name as customer_ln, account.company as customer_cp FROM ticket INNER JOIN staff as assign ON assign.id = ticket.assign_to INNER JOIN staff as openby ON openby.id = ticket.open_by INNER JOIN ticket_dept as department ON department.id = ticket.dept_id INNER JOIN account ON account.id = ticket.account_id INNER JOIN account as customer ON customer.id = ticket.account_id WHERE ticket.update_timestamp > :sql_last_value"
use_column_value => true
tracking_column => "update_timestamp"
tracking_column_type => "timestamp"
type => "ticket"
clean_run => true
}
jdbc {
...
statement => "SELECT ticket_msg., staff.first_name, staff.last_name FROM ticket_msg INNER JOIN staff ON staff.id = ticket_msg.staff_id WHERE ticket_msg.id > :sql_last_value"
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
clean_run => true
type => "ticket_msg"
}
jdbc {
...
statement => "SELECT ticket_msg.
, staff.first_name, staff.last_name FROM ticket_msg INNER JOIN staff ON staff.id = ticket_msg.staff_id WHERE ticket_msg.update_timestamp > :sql_last_value"
use_column_value => true
tracking_column => "update_timestamp"
tracking_column_type => "timestamp"
type => "ticket_msg"
clean_run => true
}
}
filter {}
output {
if [type] == "intranet" {
stdout { codec => json_lines }
elasticsearch {
index => "intranet"
document_id => "%{id}"
hosts => ["127.0.0.1:9200"]
document_type => "intranet"
}
}
if [type] == "ticket" {
stdout { codec => json_lines }
elasticsearch {
index => "ticket"
document_id => "%{id}"
hosts => ["127.0.0.1:9200"]
document_type => "ticket"
}
}
if [type] == "ticket_msg" {
stdout { codec => json_lines }
elasticsearch {
index => "ticket"
action => "update"
doc_as_upsert => true
document_id => "%{ticket_id}"
document_type => "ticket"
hosts => ["127.0.0.1:9200"]
}
}
stdout {
codec => rubydebug
}
}
P.S: im setting the ticket_msg id to ticket_id for updating the right ticket.
I want reply to be added to an array for elasticsearch query.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.