Recommended RDMS ingestion approach can lead to lost updates

The official RDMS ingestion docs recommend an approach based on tracking row modification time in the sql_last_value of the jdbc plugin.

However this does not seem to account for database transactions; a row's modification_time is not necessarily the same as the time the transaction commits and the updated row becomes visible to logstash.

With concurrent table updates it is possible for a lower modification_time to commit after a change with a higher modification_time, in which case the former will never get indexed since it will be behind sql_last_value.

I was able to reproduce this by following the linked tutorial and using two different database connections (C1 & C2):

C1 - begin; INSERT INTO es_table (id, client_name) VALUES (4,"Missing!"); -- Never gets indexed

C2 - INSERT INTO es_table (id, client_name) VALUES (5,"latest"); -- Gets a higher modification_time, indexed

C1 - commit;

ie. When factoring in transactions the most recent modification_time can actually move backwards, which seems to break the premise of this cursor based approach - is this a known issue?

Hey Alex! I think the ingestion docs are to be used as a demonstration and guidance. When it comes to your production implementation of this, the rules and processes may vary. With that being said, yes, in your example, stalling commits will change how the modification_time will be interacted with. If your system is experiencing many paused commits, then changing the approach to something different may be required for you to ensure the consistency you need.

For example, you could use a separate boolean field called "is_ingested" on the table, and instead of calling a query like from the example:

statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"

you can then utilize a stored procedure call instead.

statement => "CALL name_of_proc()

Calling a stored procedure will allow you to encapsulate the logic in the database and then use the proc to select all proper data AND update the "is_ingested" field to control what is being pulled from Logstash.

These are just ideas, and your mileage may vary, but there are many ways to manage the meta data necessary to allow Logstash to pull data consistently.

Hi Eddie,

Thank you very much for the response, it is much appreciated.

I believe this problem would occur even without explicit stalling. Our transaction control is performed at the application layer (in common with most apps) so there are a number of factors that could affect transaction commit time like the framework we are using, garbage collection, network etc.

I have seen this cursor based approach lead to a major production incident in an application where accuracy was critical and updates were on occasion not being indexed - it would be good if the docs were clearer here.

Re: use of an is_ingested column, we did move to a similar approach in the aforementioned application. However the risk is we are now marking records as ingested prematurely - at the point they have been read into memory by logstash and not at the point of indexing, so if anything happens to that logstash instance (crash/termination) once that query runs but before indexing is complete, these updates will be lost.

I tested this by

  1. Terminating ES
  2. Updating a record in postgres
  3. Seeing logstash try and fail to push the updated record to ES (now marked as ingested)
  4. Terminating logstash

Now that the record has been marked as ingested in step 3 it won't get picked up again, ie. the system is not eventually consistent.

I'm still looking for an approach that will guarantee eventual consistency, if you have any further pointers?

One approach would be if we could get logstash to execute jdbc queries inside transactions that only commit post successful ingestion into ES - perhaps that could be arranged?

Thanks again!

You're welcome @Alex_McAusland!

And you're also right. I have run into this before in production. Unfortunately, since Logstash cannot perform acknowledgments in this way, it also made it difficult for me.

In the past, I ended up moving to a client-based solution (.net) in which we performed our own database polling of the tables and were adequately able to check timestamps and ingestion flags to guarantee records were picked up and delivered appropriately.

  1. schedule .net service (or cron job, etc)
  2. query for changed data based on timestamps/ingestion flags/priority flags
  3. insert into elasticsearch - check for any errors in the insertion
  4. update rows in SQL to deem them as properly inserted.

In a situation where time and accuracy are critical to business needs, then I think using a client here to better guarantee delivery makes sense (you can then also implement something like APM to monitor how well you're performing).

There are other options as well - I have used JumpMind's SymmetricDB for database synchronization pretty successfully across multiple different database vendors.

Many thanks again Eddie, I am reassured to know others have been on this journey.

The solution you describe sounds like it would address all the problems I've encountered so far. I wondered though if a relatively simple logstash tweak could facilitate robust indexing?

If logstash wraps both the running of the jdbc query and ES indexing inside a transaction (that only commits if the ES indexing succeeds) I think we'd have a robust approach.

We would have something like -

begin;
changes = update records set indexed = true where indexed = false returning *;
... index changes into elastic search
commit/rollback if indexing failed;

Is this something I could propose for logstash?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.