Bulk indexing using logstash jdbc plugin

Hello,

I'm indexing five years of data from MySQL to Elasticsearch using the Logstash JDBC plugin. My SQL query joins around 15 tables, yielding separate rows for each order_item of an order.

Issue 1: Delayed Aggregation
During indexing, if the arrival times of order items are staggered (e.g., 30 seconds apart), only the last item gets aggregated. Earlier items are missing in the Elasticsearch index for that order. How can I adjust Logstash settings to handle these delays?

Issue 2: Pagination
I'm concerned about order items being split across pages due to pagination settings. How should I configure the jdbc_page_size in Logstash to ensure all related items of an order are fetched together?

Here’s a snippet of my configuration:

Thank you in advance for your help!

input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://localhost:3306/COMMERCEDB"
    jdbc_driver_library => "/path/to/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_user => "username"
    jdbc_password => "password"  # Ensure you provide a secure password in a production environment
    jdbc_paging_enabled => true
    jdbc_page_size => 10000
    statement => "
      SELECT DISTINCT
        o.cart_order_guid AS order_quote_id,
        o.external_order_number AS external_order_number,
        o.last_modified_date AS last_modified_date,
        <other_fields>
        <from_clauses>
        <where_condition>
       "
    use_column_value => true
    tracking_column => "last_modified_date"
  }
}

filter {
    aggregate {
         task_id => "%{order_quote_id}"
         code => "
            begin
            logger.info('---------Line Item charges with Logging entire event: ' + event.to_hash.inspect)
            <your_logic_here>
            end
    "
    push_previous_map_as_event => true
    timeout => 10
    timeout_tags => ["aggregate"]
  }

  if "aggregate" not in [tags] {
    drop {}  # Drop events that aren't successfully aggregated.
  }
}


output {
  elasticsearch {
    hosts => ["localhost:9200"]
    action => "update"
    doc_as_upsert => true
    index => "quote_lineitems"
    document_id => "%{orderQuoteID}"
  }
  stdout {
    codec => rubydebug
  }
}