Refresh in pipeline: output and/or filter

Hi,

in logstash pipeline consecutive documents are consumed from RabbitMQ. Sometimes need to perform lookup on already imported documents via filter->elasticsearch->query_template.
However, lookup doesn't query the latest state, as refresh is required! Have tried to explicitly perform refresh via http plugin on each node of the cluster, before lookup

http {
 verb => "POST"
 url => "https://elasticX:9200/index_for_refresh/_refresh"
 user => "user"
 password => "password "
 cacert => "/etc/logstash/certs/elastic-ca.pem"
}

but no it is not consistent and can't rely on it.
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html

https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html

How to perform steady refresh either in filter or in output, so latest cluster state serves lookup?

Regards

That feels a little like using a hammer on a screw. Maybe we can find another approach?

Can you share a little what those documents are, why you need the lookup and how that query looks like, and what the end goal is?

Thanks xeraa,

split into 2 separate (independent) processes:

  1. continuously importing into index, where RabbitMQ is configured as input
  2. periodically performing lookup, where elaticsearch index (above) is configured as input. If lookup returned results, than saving the enriched document in new index; otherwise it's examined in next iteration

That does it for now, let me know if you have an advice or better insight please

Regards

It sounds relatively similar to the enrich ingest pipeline — maybe that's an alternative? It runs within Elasticsearch and won't need Logstash or a queue; though it has some limitations around updating the lookup index. It might still be a better fit?

Also I'm not sure I follow this part on the _refresh API:

While not recommended for production because of the performance overhead, this should do the right thing.

Thanks xeraa,

have tried Enrich policy, but elasticsearch filter fits better for this use-case, because retrieved results can be conditionally handled on custom way.
_refresh API doesn't synchronously execute (wait to be refreshed) within same logstash pipeline, so latest could not be sequentially collected in next lines of pipeline code. As you wrote, it's not recommended to attempt to use ElasticSearch in transactional manners.

Regards

Interesting. _refresh is generally a blocking call. What's your setting for pipeline.workers and does 1 make a difference?

But all of this feels very much like a workaround. Also calling the _refresh for every single document in a batch.