Logstash JDBC plugin + Steaming filter + delta import when a child table has changed

Hello,

I'm using the JDBC input plugin with the streaming filter to enrich the base data with extra data found in an other tables:

     input {
         jdbc {
             jdbc_connection_string => "jdbc:mysql://mysql:3306/myschema"
             statement => "SELECT * from product where last_modified > :sql_last_value"
         }
     }
     filter {
       jdbc_streaming {
         jdbc_connection_string => "jdbc:mysql://mysql:3306/myschema"
         statement => "select * from user WHERE product_id = :product_id"
         parameters => { "product_id" => "id"}
         target => "user_detai
       }
     }
     output {
       {...}
    }

So as it is now the document get re-indexed 'only' when something changes in the product table.

So my question is, what is the cleanest way with the JDBC input plugin to re-index the document when something also changes in the 'child' table user?

Is the only way doing something like this?

SELECT * from product where id IN (SELECT product_id FROM user where last_modified > :sql_last_value OR ... other child tables if necessary)

And if the streaming_filter is using a different datasource (a different database).
Is there a way to detect when something has changed in the child table and re-index the whole document?

Thanks!

This is a problem that we know exists but we do not have a good solution for this at this time.

In other words, how does one sync a multi-table database model instance with a compound Elasticsearch document when any detail of the database model instance changes?

One particular stumbling block is how to encode the relationship between the DB tables and the ES doc template so that a changes "feed" can update the correct portion of the appropriate doc in ES.

We are not even sure that a generalised solution in the form of a CDC input can be done.

This is a link to the considerations for a MySQL CDC solution. you might consider a Debezium standalone setup to Kafka and then consume the changes from Kafka in LS. It is a lot of moving parts though.

A homebrew LS only solution is quite tricky as well - but I can help you with experimentation.

  1. Have one 'parent' pipeline create the compound doc in ES using jdbc input.
  2. Run one 'child' pipeline for each child table with the jdbc input tracking last_modified as you show in OP. Make sure you have the parent ID for this child available. Use the ES filter to retrieve the original doc from ES. Update this doc with the child changes and update the doc in the ES output.

One problem I foresee is to synchronisation. The parent pipeline must be running and have indexed a parent doc well before a child pipeline attempts to modify the parent doc.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.