Elasticsearch filter and input on different events

Hello.
I am having serious trouble understanding if this is even possible.
I have an elasticsearch index containing two types of information: messages and parts.
I want to unify some of the events in the index into a single event in a new index.
Messages and parts that should be "merged" have a field value in common.
In the messages this field is called header.message-Id and in the parts this field is called message.headers.message-Id.

What I have been trying to do is:

  • elasticsearch input plugin, with a query to return only the parts that I need, including content of field message.headers.Message-Id
  • elasticsearch filter plugin, with a query on the messages that searches for the match: headers.Message-Id (field of the LOGSTASH FILTER EVENT) == [message][headers][Message-Id] (field of the LOGSTASH INPUT EVENT)
  • elasticsearch output plugin to write to a new index

In all this, I want to also rename all the fields but that is secondary.
So, when I run this thing, NO MATTER WHAT, I get a

"tags" => [
        [0] "_elasticsearch_lookup_failure"
    ]

Is this even possible to do in the first place?
Other ideas on how to do this if this method is impossible?

I will now include my config file:

input {
  elasticsearch {
    hosts => "localhost"
    index => "test-alfa"
    docinfo => true
    query => '{
     "query": {
        "query_string": {
          "analyze_wildcard": true,
          "query": "REDACTED"
        }
      }
    }'
  }
}
filter{
  elasticsearch {
    hosts => "localhost"
    index => "test-alfa"
    #this query works on its own
    query => '{
      "query": {
        "term": {
          "headers.Message-Id.keyword": {
            "value": "[message][headers][Message-Id][0]" #I use the value on the Input event as a key for the search
          }
        }
      }
    }'
    result_size => 1
    fields => {"any_field_of_the_event_returned_by_the_above_query" => "new_field_that_should_show_up_but_does_not"}
  }
}
output {
  #elasticsearch {
  #  index => "test-bravo"
  #  hosts => ["localhost:9200"]
  #}
  stdout { codec => rubydebug }
}

I realised I can make my life way easier and copy those fields to a commonly named field key.
Now I only need to find a way to merge them.
I am looking into aggregate filter plugin but I really don't understand how it works

        "value": "[message][headers][Message-Id][0]" #I use the value on the Input event as a key for the search

To have any chance of working the field reference needs to be %{[message][headers][Message-Id][0]}.

My dear magnus, I was praying for your intervention! I moved to aggregate but that seems to introduce lots of problem to my use case. I will now try to go back to this idea.
Thanks for the suggestion. I will now try again.
I would like to know what you think of this problem and the solution in general, if there are better ways and if this is a horrible idea... any feedback is appreciated.
I can also tell you about my aggregate issues if you feel you could help there.

Edit:
I tried with your suggestion, still getting errors but I saw an improvement. Now I tried to use query_template instead of query and that does not throw weird erros!
Now everything works fine, except that the field I want to copy gets set to NIL

filter{
  if [@metadata][_type] == "messages" {
    mutate {
      add_field => {
        "key" => "%{[headers][Message-Id][0]}" 
      }
    }
  }
  else {
    mutate {
      add_field => {
        "key" => "%{[message][headers][Message-Id][0]}"
      }
    }
  }

  if (...) {
    elasticsearch {
      hosts => "localhost"
      index => "test-alfa"
      query_template => "template.json"
      result_size => 1
      fields => {
        "[geo][city]" => "city-gold"
        "[geo][country][name]" => "country-gold"
        }
    }
    dissect {...}
    mutate {...}
  }
}

what i get is

(...)
"city-gold" => nil
(...)
"country-gold" => nil
(...)

ideas on why the value is not retrieved?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.