How can i Update-by query and index events to elastic in the same pipeline?

Hi! I have an index which represents a transaction of states, so i might have more than one doc with different states and with the same id. I thought that the best way to retrive the last state of an id was to add a new field called last_state and set it to "true", then make an update-by-query to the docs with the same ids and set theese states to false.

I've read that add an update-by-query in logstash here: https://stackoverflow.com/questions/53330232/does-logstash-support-elasticsearchs-update-by-query

As for my pipe.conf:

input {
 beats {
    port => 5043
    id => "filebeat"
 }
}

filter {
    if "tag" in [tags] {
        mutate {
            gsub => ["message","[\r\n\t]", ""]
           
        }
        grok {...
            tag_on_failure => ["no_match"]
        }
        if "no_match" not in [tags] {
            ruby {
              ...
            }
            mutate {

                add_field => { "[@metadata][index]" => "my_index-%{+YYYY}" }
            }
            if "state" in [fields] {
                mutate {
                    add_field => {"lastState" => true}
                }
                clone {
                    clones => [ "my_index-%{+YYYY}" ]
                    add_tag => ["call_to_update"]
                }
                if "call_to_update" in [tags]
                mutate {
                    add_field => {
                        "[script][lang]" => "painless"
                        "[script][source]" => "ctx._source.lastState = false"
                        "[query][match][state]" => "%{state}"
                    }
                }
                prune {
                    whitelist_names => ["^script*", "^query*"]
                }
                http {
                    url => "https://nodo1:9200/%{[@metadata][index]}/_doc/_update_by_query"
                    verb => "POST"
                    body_format => "json"
                    body => "message"
                }
            }  
            } 
        else {
            mutate {
                add_field => { "[@metadata][index]" => "grok_parse_failure-%{+YYYY}" }
            }
        }
        }     
    else {
        mutate {
            add_field => { "[@metadata][index]" => "other_product" } 
        }
    }
}
output {
     if "script" not in [fields] {
        elasticsearch {
            hosts    =>["nodo1"]
            user     => "xx"
            password =>"xx"
            index => "%{[@metadata][index]}"

        }
    }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.