Elasticsearch is processing incoming events/messages from Logstash in a non-sequential order

We have a system that synchronises data from MongoDB to Elasticsearch . Here are the key components:

  1. MongoDB Source Connector: This component reads events from the MongoDB oplog and produces messages on a Kafka topic.
  2. Logstash: Logstash consumes these messages from Kafka and sends them to Elasticsearch. We've configured Logstash with specific settings, including pipeline.workers: 1 and pipeline.ordered: true, to ensure that events are processed in the order they are received.
  3. Elasticsearch: 1 node cluster with index having single primary shard.

Version of the ELK stack being used: 8.7.1

Problem:
We have a use case where following operations are performed in mongoDB in the sequential manner:

  1. Creation of Document A
  2. Update of Document A
  3. Deletion of Document A

While we can see these operations are processed sequentially in Logstash, we're encountering an issue: the deletion of Document A is not reflected in Elasticsearch. The _version on the doc is 3 indicating all 3 events are performed. This suggests that the delete operation might be processed before an update operation, causing Document A to remain in Elasticsearch having being deleted in MongoDB.

The logstash pipeline is as follows:

 input {
    kafka {
        id => "my_plugin_id"
        group_id => "logstash"
        bootstrap_servers => "broker:29092"
        topics => ["topic"]
        auto_offset_reset => "earliest"
        consumer_threads => 1
    }
}

filter {
    json {
        source => "message"
        target => "message"
        add_field => { "mongoId" => "%{[message][documentKey][_id][$oid]}" }
    }
}


output {
    if [message][operationType] == "delete" {
        elasticsearch {
        hosts => "http://es01:9200"
        user => "elastic"
        password => "changeme"
        index => "index_name"
        document_id => "%{[mongoId]}"
        action => "delete"
        }
    }
    else {
        elasticsearch {
        hosts => "http://es01:9200"
        index => "index_name"
        document_id => "%{[mongoId]}"
        user => "elastic"
        password => "changeme"
        pipeline => "index_pipeline"
        }
    }
}

Note: As mentioned in the above config, for other actions than delete, we use an ingestion pipeline that restructure document data to be indexed.

One hypothesis, though subject to verification:
To the best of my knowledge, the Elasticsearch output plugin (used in logstash pipeline) uses bulk API to send data to Elasticsearch. Can it be possible that events (sub requests) being processed in a single batch don't adhere to a strict ordering i.e. delete may be running (or completing) before update event, so the final document visible in Elasticsearch corresponds to update operation.
Refresh index is set to default i.e. 1 time/sec
Also I increased the pipeline.batch.delay is increased to 5000ms (default is 50ms) to make sure that all the events are in the same batch. (edited)

With pipeline.workers set to 1 and pipeline.ordered set to true, logstash should process the events on the order they happen, but I'm not sure that this will guarantee that the events will be processed in the same order in Elasticsearch side.

Searching the forum I found this post on a topic that says that the user should not expect that the operations on a bulk request will be performed in order, not sure if this is still true, maybe someone from Elastic can provide more feedback.

Note that the post you link to is referring to something related, but different. I read your question as being about the ordering of operations in the context of a single request. The post you link to is answering a question about the serialization of operations in the context of two concurrent bulk requests.

That said, in general, there are no guarantees around the order of operations in a single bulk request. This is because the events in the bulk request could be distributed across multiple shards. Once the events are distributed at the shard level, concurrency effects can lead to events processing in a different order in time than they were ordered in the request. For example, the bulk request:

POST /_bulk
{"index": {"_index": "my_index", "_id": "1"}}
{"field": 1}
{"index": {"_index": "my_index", "_id": "2"}}
{"field": 2}

It could be that the doc with _id 1 is distributed to one shard, and the doc with _id 2 is distributed to another shard. Event processing occurs concurrently across shards, meaning the shard responsible for _id 2 could process that event before the shard responsible for _id 1 processes that event.

That's the general statement: no guarantees.

@jasontedor Thank you for your reply. I understand your point regarding the lack of guarantees for the order in a single bulk request when handling events or sub-requests for different document_ids located on different shards. However, what about multiple sub-requests on a single document with the same document_id (which resides on a single shard)? Is that guaranteed?

In a single bulk request, consider the following updates for document_id A:

1. Update document_id: A, fieldA: valueA
2. Update document_id: A, fieldA: valueB

Is it guaranteed that the final value of fieldA in document A will be valueB?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.