При добавлении в конфиг Logstash elasticsearch filter plugun появилась ошибка filebeat: Failed to connect to backoff(async(tcp://xxx.xxx.xxx.xxx:5044)): dial tcp xxx.xxx.xxx.xxx:5044: connect: connection refused

В фильтре logstash я пытаюсь обратиться к elasticsearch, чтобы получить необходимые данные. Без elasticsearch plugin все работает хорошо (если код закоментирован).

/etc/logstash/conf.d/mysql-beats.conf

input {
  beats {
    port => 5044
    client_inactivity_timeout => 3000
#    ssl => false
  }
}

filter {
  mutate {gsub => ["message", "\t", " "]}
  grok {
    match => {"message" => "%{WORD:method}\s%{WORD:product_name}\s%{WORD:product_price}"}
  }
  if [method] == "UPDATE" {
    mutate {
      convert => {"product_price" => "float"}
    }
#    elasticsearch {
#       hosts => ["http://0.0.0.0:9200"]
#       query_template => "/usr/share/logstash/product-names.json"
#       fields => { "product_name" => "title", "ourProductId" => "ourProductId" }
#       index => "product-names"
#       user => "elastic"
#       password => "elasticpassword"
#    }
  }
}

output {
  if "_grokparsefailure" not in [tags] {
    elasticsearch {
      hosts => ["http://0.0.0.0:9200"]
      index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
      user => "elastic"
      password => "elasticpassword"
    }
  }
}

/usr/share/logstash/product-names.json

{
    "size": 1,
    "query": {
      "term": {
       "ourProductId": {"value": "%{product_name}"}
      }
    },
   "_source": ["title", "ourProductId"]
 }

curl -XGET "xxx.xxx.xxx.xxx:9200/product-names/_mapping?pretty"

{
  "product-names" : {
    "mappings" : {
      "properties" : {
        "ourProductId" : {
          "type" : "keyword"
        },
        "product_name" : {
          "type" : "keyword"
        }
      }
    }
  }
}

из-за чего может возникать ошибка?

Elasticsearch plugin должен обращаться именно к индексам логов? Или можно взять данные из любого индекса?

Изменил query_template на query => "ourProductId:%{product_name}"
данные записываются с filebeat в elasticsearch через logstash, но появился тег _elasticsearch_lookup_failure

А elasticsearch обрабатывать запросы успевает?

А как это можно проверить? У меня в индексе всего две тестовые записи с двумя полями, я думаю, что должен успевать. А logstash output не ждет, когда отработают все плагины в фильтре? Я думал, что когда завершены все процессе в фильтре, то тогда только происходит запись в эластик

Мне кажется, проблема в том, что я не могу подключиться к произвольному индексу эластика, а нужно использовать индекс для filebeat. Я попробовал подключиться именно к нему и получилось найти данные, а тег _elasticsearch_lookup_failure пропал.

А если заменить filebeat input на stdin input, то что появляется в логах logstash?