Delete elastic document by _id found via elasticsearch filter plugin in logstash

Hi, so currently i am ingestying data from kafka, filtering it throght logstash 7.2 and storying it in elasticsearch 7.2.
The data im receiving are jamming events so the trap has an id and for each trap i should have 2 messages, one of activation and one of end, what i want to do is the following:

    • when i receive the activation event just add a field with the value "Active".
    • when i receive the end event, i want to find the elastic doc by the "trap_id" and delete that doc in order to have only one document per trap.

/etc/logstash/conf.d/query_templates/check_status_trap.json -> i cant share this template but what it does is find the doc where trap_id is equal to the trap_id of the message.

Here is my logstash configuration:

    input{
            kafka {
                    bootstrap_servers => "xxxxxx"
                    topics => "xxxxx"
                    type => "xxxxx"
                    client_id => "xxxxx"
                    group_id => "xxxxx"
                    auto_offset_reset => "xxxx"
            }
    }

    if [type] == "xxxxx" {
                    grok {
                            match => {
                                    "message" => "%{NUMBER:trap_id}, %{DATA:start_timestamp}, %{DATA:update_timestamp}, %{DATA:xxxxx}, %{NUMBER:xxxxx}, %{NUMBER:xxxx}, %{DATA:xxx}, %{DATA:levels}, %{GREEDYDATA:action}"
                            }
                    }
                    date{
                            match => ["start_timestamp", "YYYY-MM-dd HH:mm:ss"] target => "start_timestamp"
                    }
                    date{
                            match => ["update_timestamp", "YYYY-MM-dd HH:mm:ss"] target => "update_timestamp"
                    }
                    mutate {
                            gsub => [ "levels", "d", ""]
                    }
                    mutate{
                            split => {"levels" => " "}
                    }
                    mutate {
                            add_field => {
                                    "jamming" => "%{[levels][0]}"
                                    "noise" => "%{[levels][1]}"
                            }
                    }
                    if [action] == "CANCEL" {
                            elasticsearch {
                                    hosts => [xxxxxx]
                                    user => "xxxx"
                                    password => "xxxxx"
                                    index => "xxxxx"
                                    query_template => "/etc/logstash/conf.d/query_templates/check_status_trap.json"
                                    docinfo_fields => {
                                            "_id" => "doc_id"
                                            "_index" => "doc_index"
                                    }
                            }
                            mutate {
                                    replace => { "status" => "Cleared" }
                            }
                    }

                    }
                    if [action] == "ALARM" {
                            mutate {
                                    add_field => { "status" => "Active"}
                            }
                    }
            }
    output {
            if [status] == "Cleared" {
                    elasticsearch {
                            hosts => [xxxxxxxx]
                            action => 'delete'
                            document_id => '%{doc_id}'
                            index => '%{doc_index}'
                            user => 'xxxxxx'
                            password => 'xxxxxx'
                    }
            }
        elasticsearch {
                hosts => [xxxxxx]
                ilm_enabled => true
                ilm_rollover_alias => 'xxxxx'
                ilm_pattern => '000001'
                ilm_policy => 'rollover_policy'
                user => 'xxxxxx'
                password => 'xxxxx'
        }
        stdout{codec => json}
    }

This is the error in logstash log:

[2020-03-31T14:58:02,371][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>404, :action=>["delete", {:_id=>"%{doc_id}", :_index=>"%{doc_index}", :_type=>"_doc", :routing=>nil}, #<LogStash::Event:0x5a5db623>], :response=>{"delete"=>{"_index"=>"%{doc_index}", "_type"=>"_doc", "_id"=>"%{doc_id}", "status"=>404, "error"=>{"type"=>"index_not_found_exception", "reason"=>"no such index [%{doc_index}]", "resource.type"=>"index_expression", "resource.id"=>"%{doc_index}", "index_uuid"=>"_na_", "index"=>"%{doc_index}"}}}}

What i get from the log trace is that docinfo_fields is not actually getting the value of those 2 fields from the doc resulted from the query to elasticsearch, but according to the documentation these parameter should retrieve info from the elastic document.
Can anyone help me to achieve my goal? Thank you

It is hard to believe that that configuration could result in the error 'no such index [%{[@metadata][doc_index]}]'

Ups, you are absolutely right, using @metadata was a failed workaround i tried and i had copied and old log trace, i am really sorry, will change it right away.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.