Kafka input - resync missing items from topic

Ran into issue where we have logstash input reading kafka topics for log events.
Last night the ES index rolled over and the write alias was lost (not on the new index... so nothing with is_write_index = true) therefore logstash failed for 15 hours failing to write to ES.

We fixed the alias and "new" data started coming in.

My question is none of the last 15 hours are showing up or even beginning to resync into the index.
Is there a way to have the kafka input "resync" to see what it missed to index?
I obviously do not want to roll back to the beginning of the topic or to duplicate already indexed events but I'd like to get back the last 15 hours for events we are missing.


Do you have the offset of the last message indexed before your issue and the first one after you fixed?

If you have both offsets you may be able to spin-up another pipeline and configure it to consume from the beginning of your topic, but create a filter in the logstash pipeline to drop anything before and after those offsets.

If you do not have the offsets you may do a similar thing, spin-up another pipeline to consume the topic from the beginning, but send the data to a temporary index, after the date in this index reaches the current data you may stop it.

Then you would need to do a delete_by_query to remove the data from before the issue and after the issue is fixed and reindex it in your index.

Those are the two ways I could think of.

Unless it logs it somewhere I'm not sure I would be able to tell where to get this info from.
Is there any sort of improvement that can be done on the input that it wouldn't mark it in offset as complete until it gets confirmation about "output" being successful?

I mean the logstash config is literally input(kafka) -> filter -> output(search). If it doesn't successfully index due to this write issue or disk full or countless other reasons I would want kafka to keep the offset to where the last successful index item was.

I haven't specified anything custom on logstash config but believe it could/can basically buffer itself so while kafka didn't keep the offset those items that failed to send to search should still be in logstash buffer to retry to search? Failing to send shouldn't allow it to be "dropped" from retry logic I wouldn't think.

Ok so it won't help me with this specific event but I found that I can capture and log the kafka offset from the metadata. So in the future I should be able to at least find out the last offset and the "first" when it picks up again.

Do you have any sort of example on the logstash pipeline filter to drop events outside those 2 offsets (so basically I'd have to setup the kafka input to use a temp consumer group, set to beginning so it goes back to the topic start)... then have it drop all events before the last event and drop all events after the "first" event from pickup... and once it's done processing remove that pipeline. (I'd obviously keep it around for one off's like this in the future).

If I'm not wrong the input does not have an end-to-end ack, so it does not wait the event to be indexed in elasticsearch to commit the offset, it commits the offset periodically after they reach the queue state of the pipeline (memory or persistent queue).

Every logstash pipeline has the following flow:

input -> queue -> filters -> outputs, so the offsets would be commited after they were consummed, not after they were ingested in elasticsearch.

It really depends on why it failed, in this case, logstash didn't failed to send the event, it was correctly sent to elasticsearch and elasticsearch wasn't able to index it and elasticsearch returned an error, if I remind correctly, when you have no write alias elasticsearch returns an error 400.

When Logstash receives a 400 response for elasticsearch it will per default drop the event and continue on the next one, for 400 and 404 responses you can however enable the Dead letter queue to store those events and reprocess them using the dead_letter_queue input, but this need to be configured upfront.

It seems that this is what happened, elasticsearch returned 400 for logstash requests and the events were dropped until you fixed the write alias and they stated to be indexed.

If your Elasticsearch went down completely for example, Logstash would stop consuming events from Kafka after the queue was full.

For this specific case enabling Dead Letter Queue would help because your events would be stored in the DLQ and you would be able to process them again, but this can lead to other issues, if your DLQ is full it may crash your entire logstash and impact other pipelines.

1 Like

As you already saw, you would need to get the offset from the metadata fields, normally I have this on my kafka pipelines:

filter {
    mutate {
        add_field => {
            "[@metadata][kafka][offset]" => "kafkaOffset"

So to allow only events betwee two offsets, I think that this would work, but I didn't tested:

filter {
    mutate {
        convert => {
            "kafkaOffset" => "integer"
    if [kafkaOffset] <= last_offset_indexed or [kafkaOffset] >= first_offset_indexed_after_fix {
        drop {}
1 Like

DLQ doesn’t support NFS but I’m currently running LS inside Kubernetes so I will probably use EFS to have path for all my index pipelines to write their DLQ. I can then spin up a separate Deployment for processing DLQ that could have been written out from any of the different pipelines I have or N replicas of a particular pipeline. I get NFS is a performance hit but in this world writing to an EBS PV instead of EFS I’d have to unmount from N different pipelines in order to read them. Not sure I see another ReadWriteMany option.

You aren't capturing the partition?

Just grabbed 1 topic now that I've recently added these metadata fields and it seems the offset is partition specific so probably need like an AND to that if condition?

Feb 11, 2023 @ 21:05:55.604	745555988	filebeat-prod-apjson-apmediaapi	0
Feb 11, 2023 @ 21:05:55.584	745555896	filebeat-prod-apjson-apmediaapi	0
Feb 11, 2023 @ 21:05:55.577	747331699	filebeat-prod-apjson-apmediaapi	1
Feb 11, 2023 @ 21:05:55.577	747331724	filebeat-prod-apjson-apmediaapi	1
Feb 11, 2023 @ 21:05:55.542	751208055	filebeat-prod-apjson-apmediaapi	2
Feb 11, 2023 @ 21:05:55.486	751208101	filebeat-prod-apjson-apmediaapi	2

based on as timestamp is going up the offset is getting less would make me think they are partition specific?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.