Hello,
I'm indexing five years of data from MySQL to Elasticsearch using the Logstash JDBC plugin. My SQL query joins around 15 tables, yielding separate rows for each order_item of an order.
Issue 1: Delayed Aggregation
During indexing, if the arrival times of order items are staggered (e.g., 30 seconds apart), only the last item gets aggregated. Earlier items are missing in the Elasticsearch index for that order. How can I adjust Logstash settings to handle these delays?
Issue 2: Pagination
I'm concerned about order items being split across pages due to pagination settings. How should I configure the jdbc_page_size in Logstash to ensure all related items of an order are fetched together?
Here’s a snippet of my configuration:
Thank you in advance for your help!
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/COMMERCEDB"
jdbc_driver_library => "/path/to/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_user => "username"
jdbc_password => "password" # Ensure you provide a secure password in a production environment
jdbc_paging_enabled => true
jdbc_page_size => 10000
statement => "
SELECT DISTINCT
o.cart_order_guid AS order_quote_id,
o.external_order_number AS external_order_number,
o.last_modified_date AS last_modified_date,
<other_fields>
<from_clauses>
<where_condition>
"
use_column_value => true
tracking_column => "last_modified_date"
}
}
filter {
aggregate {
task_id => "%{order_quote_id}"
code => "
begin
logger.info('---------Line Item charges with Logging entire event: ' + event.to_hash.inspect)
<your_logic_here>
end
"
push_previous_map_as_event => true
timeout => 10
timeout_tags => ["aggregate"]
}
if "aggregate" not in [tags] {
drop {} # Drop events that aren't successfully aggregated.
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
action => "update"
doc_as_upsert => true
index => "quote_lineitems"
document_id => "%{orderQuoteID}"
}
stdout {
codec => rubydebug
}
}