Logstash DLQ commit_offsets => true seems to be ignored?

I'm now playing around with the dead letter queue feature of Logstash. First I tried around with commit_offsets => false to be able to set up everything and as expected the DLQ events were ingested after each restart of logstash.
Now after I'm happy with the extra dead letter queue pipeline which is ingesting a failed event just without much groking and so on, I switched to commit_offsets => true. But the DLQ events still get ingested after each restart of Logstash. Am I doing something wrong or is this option just not working in Logstash 7.9.1 or do I just understand it wrong how commit_offsets is working?

The DLQ Pipeline looks similar to:

# the pipeline for DLQ messages
# they just get ingested with some tags and without any pattern-analyzing
input {
  dead_letter_queue {
    path => "/data/logstash/dead_letter_queue" 
    # start from last known position
    commit_offsets => true 
    # for debugging always send all messages from dlq
    # commit_offsets => false
    pipeline_id => "filter-syslog" 
    tags => [ "syslog" ]
    type => "syslog"
  }

  dead_letter_queue {
    path => "/data/logstash/dead_letter_queue"
    # start from last known position
    commit_offsets => true 
    # for debugging always send all messages from dlq
    # commit_offsets => false
    pipeline_id => "filter-filebeat"
    tags => [ "filebeat" ]
    type => "filebeat"
  }
}

filter {
    if "syslog" in [tags] {
        # we remove all fields except listed ones
        prune {
            id => "prune_1_1"
            whitelist_names => [ "@timestamp", "@metadata", "event" ]
        }

        mutate {
            id => "mutate_1_1"
            add_tag => [ "syslog-${LOGSTASH_FILTER_TAG}" ]
            add_tag => [ "syslog" ]
        }
    }

    if "filebeat" in [tags] {
        # we remove all fields except listed ones
        prune {
            id => "prune_1_2"
            whitelist_names => [ "@timestamp", "@metadata", "event", "agent", "beat" ]
        }

        mutate {
            id => "mutate_1_2"
            add_tag => [ "filebeat-${LOGSTASH_FILTER_TAG}" ]
            add_tag => [ "filebeat" ]
        }
    }

    # we add the message field with the content of the original event sent
    mutate {
        id => "mutate_1_x"
        add_field => [ "message", "%{[event][original]}" ]
        add_field => [ "[dlq][original]", "%{[@metadata][dead_letter_queue]}" ]
        add_tag => [ "dlq" ]
    }

    if "dlq" in [tags] {
        json {
            id => "json_1"
            source => "[dlq][original]"
            # seems to overwrite everything - will add original later again
            target => "[dlq]"
            # to suppress warnings and tagging with "_jsonparsefailure"
            # skip_on_invalid_json => true
        }

        if "_jsonparsefailure" not in [tags] {
            # we add the original dlq-message back
            # and copy the stuff from dlq.reason into a temp-field
            mutate {
                id => "mutate_2"
                add_field => [ "[dlq][original]", "%{[@metadata][dead_letter_queue]}" ]
                add_field => [ "[dlq][reason_temp]", "%{[dlq][reason]}" ]
            }

            # we remove the initial reason field
            mutate {
                id => "mutate_3"
                remove_field => [ "[dlq][reason]" ]
            }

            # add the temp-field created before as orginial
            mutate { 
                id => "mutate_4"
                add_field => [ "[dlq][reason][original]", "%{[dlq][reason_temp]}" ]
            }

            # and remove the not needed temp-field again
            mutate {
                id => "mutate_5"
                remove_field => [ "[dlq][reason_temp]" ]
            }
            
            # so that we can grok the original to get more readable additional fields - this later can be improved even more
            grok {
                id => "grok_1"
                match => { "[dlq][reason][original]" => "%{GREEDYDATA:[dlq][reason][message]}. status: %{POSINT:[dlq][reason][status]}, action: %{GREEDYDATA:[dlq][reason][action]}" }
            }
        }
    }
}

output {
    if ([event][dataset] == "syslog") {
        elasticsearch {
            id => "elasticsearch_logstash_syslog_ilm_dlq"
            manage_template => false
            codec => json_lines
            cacert => "/etc/logstash/certs/elasticsearch-ca.pem"
            user => "logstash_internal"
            password => "supersecure"
            ssl => true
            ilm_enabled => "true"
            ilm_rollover_alias => "logstash_syslog_ilm"
            ilm_pattern => "{now/d}-000001"
            ilm_policy => "logstash-syslog"
            # the elasticsearch ouptut plugin will loadbalance :D - IPs obfuscated
            hosts => [ "https://123.123.123.123:9200", "https://123.123.123.124:9200" ]
        }
    }

    if [event][dataset] == "filebeat" {
        elasticsearch {
            id => "elasticsearch_logstash_filebeat_ilm_dlq"
            manage_template => false
            codec => json_lines
            cacert => "/etc/logstash/certs/elasticsearch-ca.pem"
            user => "logstash_internal"
            password => "supersecure"
            ssl => true
            ilm_enabled => "true"
            ilm_rollover_alias => "logstash_filebeat_ilm"
            ilm_pattern => "{now/d}-000001"
            ilm_policy => "logstash-filebeat"
            # the elasticsearch ouptut plugin will loadbalance :D - IPs obfuscated
            hosts => [ "https://123.123.123.123:9200", "https://123.123.123.124:9200" ]
        }
    }

    if "dlq" in [tags] {
        file {
            id => "file_dlq_debug"
            path => "/data/plain-log-archive/dlq-debug.txt"
        }
    }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.