Logstash is processing old events

Getting continuous errors like below in logstash

{ "timestamp": "2023-02-10T14:04:33.661-08:00", "severity": "warning", "message": "Could not index event to Elasticsearch. {:status=>404, :action=>['index', {:_id=>nil, :_index=>'index-2022.12.12', :routing=>nil}, #LogStash::Event:0x3450fd23], :response=>{'index'=>{'_index'=>'index-2022.12.12', '_type'=>'_doc', '_id'=>nil, 'status'=>404, 'error'=>{'type'=>'index_not_found_exception', 'reason'=>'no such index [index-2022.12.12]', 'index'=>'index-2022.12.12', 'resource.id'=>'index-2022.12.12', 'resource.type'=>'index_expression', 'index_uuid'=>'na'}}}}"}

Logstash on timestamp 2023-02-10T14:04:33.661-08:00 is trying to process old index index-2022.12.12.
Index gets created based on @timestamp of logstash. But why current logstash process old index?

It is impossible to know unless you provide more information like the logstash pipeline you are using.

This is the logstash pipeline

input {
  beats {
    id => "filebeat_tls"
    port => 5044
    type => filebeat
filter {
  if [type] == "readiness" {
    drop {}
  else if [type] == "filebeat" {

    if [input][type] == "container" {
        grok {
          match => {
            "[log][file][path]" => "/var/log/containers/%{DATA:pod_name}_%{DATA:namespace}_%{GREEDYDATA:container_name}-%{DATA:container_id}.log"

      if [message] and [message] != "" {
        json {
          skip_on_invalid_json => true
          source => "[message]"
          remove_field => ["stream"]
      if [ts] {
        mutate {
          copy => { "@timestamp" => "ts" }
      if [stream] {
        mutate {
          rename => {"[stream]" => "[severity]"}
          add_field => {"[version]" => "0.3.0"}
          copy => { "@timestamp" => "timestamp" }
          add_tag => [ "forced_conversion" ]
        if [severity] == "stderr" {
          mutate {
            replace => {"[severity]" => "error"}
        } else if [severity] == "stdout" {
          mutate {
            replace => {"[severity]" => "info"}
output {
  pipeline { send_to => "searchengine_pipeline" }

http.host: ""
http.port: 0000
log.level: "info"
pipeline.workers: 2
pipeline.batch.size: 2048
pipeline.batch.delay: 50
path.logs: /opt/logstash/resource
pipeline.ecs_compatibility: disabled

Ignore older is set to 24h in filebeat and LT persistant queue contains no data(0 event count)

queue" : {
        "events" : 0,
        "type" : "persisted",
        "data" : {
          "free_space_in_bytes" : 331534618624,
          "storage_type" : "xfs"
        "capacity" : {
          "max_queue_size_in_bytes" : 1073741824,
          "queue_size_in_bytes" : 45056621,
          "max_unread_events" : 0,
          "page_capacity_in_bytes" : 67108864
        "events_count" : 0,
        "queue_size_in_bytes" : 45056621,
        "max_queue_size_in_bytes" : 1073741824

Since the name of index get the date information from the field @timestamp and you are getting the value for the @timestamp field from a field in your document, you need to check in the source file that filebeat is reading if you have old values for this field.

There is not in your Logstash pipeline that would do that, the issue is probably on your source file, not even in Filebeat.

From what you shared the value of the @timestamp field comes from the value of the ts field in your documents, so you may have events where this value is older than the current date.

@leandrojmp ,Thanks for your response.But copy in mutate filter is from source to destination right?
It means "@timestamp" field value is copied to "ts" field and not "ts" to @timestamp

Oh yeah, my mistake, you are right.

So, where is your @timestamp comming from? Do you have anything else in the searchengine_pipeline pipeline?

If not, Logstash will use the current time as the @timestamp, unless you have it coming from your source document from Filebeat, which you didn't share.

@leandrojmp, I have checked the source file, it does not have any @timestamp field in it. Due to data privacy, it is not possible for me to provide the source file here.
But I still doubt on the Persistent queue, DO you have any idea, what is the behavior when logstash's output destination such as lumberjack is not available for sometime and logstash restarted due to some reason?

The persistent queue resides between the input and the filter block, if you have it enabled when logstash receives a message it will be put on this queue to be processed.

If an output has some issues the messages will start to accumulate in the persisted queue until it is full, when it is full logstash will stop to accept new messages until the output is back and it can start to drain the persisted queue.

When you restart logstash, it will start to process the persisted queue again.