Filter by timestamp - re-ingesting logs from Kafka

We had a problem with our cluster this weekend (index went read-only) and looks like the OFFSET expired in Kafka or something similar. The following message appeared in our logstash logs regarding KafkaINVALID_FETCH_SESSION_EPOCH.

Saying that, we lost 24 hours of data and it is still in our Kafka cluster. Using the offset is somewhat hard but Kafka does have the timestamp recorded in the message.

I would like to restart at the earliest and then filter on data greater than the last message I received. How would the filter looks like in Logstash?

Example:
if @timestamp < "2019-02-24T10:00:00.000Z" { drop }

Thanks

    mutate { add_field => { "[@metadata][cutoff]" => "2019-02-24T10:00:00.000Z" } }
    date { match => [ "[@metadata][cutoff]", "ISO8601" ] target => "[@metadata][cutoff]" }
    ruby { code => 'if event.get("@timestamp") < event.get("[@metadata][cutoff]") then event.cancel end ' }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.