I am trying to sync the RDBMS (postgres
) data to Elasticsearch
using Logstash
and it works fine.
Logstash
configuration file:
input {
jdbc {
jdbc_driver_library => "/home/raj/Downloads/postgresql-42.2.23.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://localhost:5432/local_test"
jdbc_user => "postgres"
jdbc_password => ""
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT *, extract(epoch FROM modification_time) AS unix_ts_in_secs FROM es.es_table WHERE (extract(epoch FROM modification_time)) > :sql_last_value AND modification_time < NOW() ORDER BY modification_time ASC"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output {
# stdout { codec => "rubydebug"}
elasticsearch {
index => "rdbms_sync_idx"
document_id => "%{[@metadata][_id]}"
}
}
Now if any record is deleted in postgres
then to update the same in Elasticsearch
trying to follow the first approach:
What about deleted documents?
The astute reader may have noticed that if a document is deleted from MySQL, then that deletion will not be propagated to Elasticsearch. The following approaches may be considered to address this issue:
- MySQL records could include an "is_deleted" field, which indicates that they are no longer valid. This is known as a "soft delete". As with any other update to a record in MySQL, the "is_deleted" field will be propagated to Elasticsearch through Logstash. If this approach is implemented, then Elasticsearch and MySQL queries would need to be written so as to exclude records/documents where "is_deleted" is true. Eventually, background jobs can remove such documents from both MySQL and Elastic.
- Another alternative is to ensure that any system that is responsible for deletion of records in MySQL must also subsequently execute a command to directly delete the corresponding documents from Elasticsearch.
Link: How to keep Elasticsearch synchronized with a relational database using Logstash and JDBC
Now how to check if the record was deleted in postgres
or not and how to get the updated value for is_deleted
and update the same in Elasticsearch
?