To index data into Elasticsearch, we are using an Apache Flink pipeline that is consuming from Kafka topics.
The index mapping looks something like below, a document with nested documents:
Since Flink is using _bulk API for indexing, we might be in the situation when Flink sends a bulk with a lot of actions that should all be added to the same document. Say we have an email with 100 messages, a index action for every message, in the same bulk to be applied to the same document.
Right now, this specific case is slowing down our indexing and it might even block the entire pipeline, since Elasticsearch is slow in processing every action.
Is there a way to tune Elasticsearch to better handle this specific case? Any advice is welcomed!
There's no server-side optimisation for this case. In principle it would be possible to implement such a thing, but in practice it seems better to do this kind of optimisation on the client side. That's where Elasticsearch assumes it'll happen today.
But, since for us right now this is not an option, would there be some general advices on how to deal with this case on ES side?
We've had a couple of incidents specifically because of this use-case and I want to limit the damage a situation like this can cause.
For example, we've noticed that during use-cases like this, young GC is happening a lot, also, shard size where those documents are also increases in size very fast, it might double, or triple in size. From my understanding, this is all due to the number of updates on the document as docs are marked as deleted but not deleted from disk until segment merge happens.
Should we have more CPU/RAM/storage to handle this better?
Anything that would minimize the impact.
Use cases with frequent updates to the same document have IMHO never been well supported by Elasticsearch as it as far as i recall results in a lot of small segments being created, which adds a lot of overhead as creating segments is relatively expensive. There are as far as i know no way to tune the cluster to get around this overhead so i would recommend addressing this client side, e.g. by changing the data model to reduce the number of updates.
That's about all you can do AFAIK. RAM may not be so important but I'd expect you to need more CPU and IO bandwidth both for the higher number of indexing operations and then for the subsequent merges. Hard to say how much more tho, you likely need to run some experiments.
We do similar thing.
You need to shard your ID so all updates to the same "ID" go to the same ES writer.
So all kafka documents to the same "ID" will end up in the same process that perform the bulk write. You will be able to eliminate this shutgun bulk write situation.
It's even better to have caching somewhere in this linear data path so you can further improve efficiency by combining all the writes to the same document with just 1 update.
The key is to combine all the actions to the same document with just 1 update.
The prerequisite to this is all kafaka messages result in updating the same ES document "must" take the same path and ended up in the same ES writer...
Indeed that is one of the improvements we intend to implement on our end, it's just with Flink sink, it's more difficult and it would require us to implement a custom sink on our end to handle this specific case.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.