We have a system that synchronises data from MongoDB to Elasticsearch . Here are the key components:
- MongoDB Source Connector: This component reads events from the MongoDB oplog and produces messages on a Kafka topic.
- Logstash: Logstash consumes these messages from Kafka and sends them to Elasticsearch. We've configured Logstash with specific settings, including pipeline.workers: 1 and pipeline.ordered: true, to ensure that events are processed in the order they are received.
- Elasticsearch: 1 node cluster with index having single primary shard.
Version of the ELK stack being used: 8.7.1
Problem:
We have a use case where following operations are performed in mongoDB in the sequential manner:
- Creation of Document A
- Update of Document A
- Deletion of Document A
While we can see these operations are processed sequentially in Logstash, we're encountering an issue: the deletion of Document A is not reflected in Elasticsearch. The _version on the doc is 3 indicating all 3 events are performed. This suggests that the delete operation might be processed before an update operation, causing Document A to remain in Elasticsearch having being deleted in MongoDB.
The logstash pipeline is as follows:
input {
kafka {
id => "my_plugin_id"
group_id => "logstash"
bootstrap_servers => "broker:29092"
topics => ["topic"]
auto_offset_reset => "earliest"
consumer_threads => 1
}
}
filter {
json {
source => "message"
target => "message"
add_field => { "mongoId" => "%{[message][documentKey][_id][$oid]}" }
}
}
output {
if [message][operationType] == "delete" {
elasticsearch {
hosts => "http://es01:9200"
user => "elastic"
password => "changeme"
index => "index_name"
document_id => "%{[mongoId]}"
action => "delete"
}
}
else {
elasticsearch {
hosts => "http://es01:9200"
index => "index_name"
document_id => "%{[mongoId]}"
user => "elastic"
password => "changeme"
pipeline => "index_pipeline"
}
}
}
Note: As mentioned in the above config, for other actions than delete, we use an ingestion pipeline that restructure document data to be indexed.
One hypothesis, though subject to verification:
To the best of my knowledge, the Elasticsearch output plugin (used in logstash pipeline) uses bulk API to send data to Elasticsearch. Can it be possible that events (sub requests) being processed in a single batch don't adhere to a strict ordering i.e. delete may be running (or completing) before update event, so the final document visible in Elasticsearch corresponds to update operation.
Refresh index is set to default i.e. 1 time/sec
Also I increased the pipeline.batch.delay is increased to 5000ms (default is 50ms) to make sure that all the events are in the same batch. (edited)