Logstsash is running pipelines in parallel

Hello!

I have a divided pipeline that generates 2 indices that enrich each other.

I would like to know how I can make it do one etl first and when it finishes the other.

my logstash filter wich enrichs has this filter:

  elasticsearch {
    hosts => "https://xxxxxxxxx:9200"
    index => "prueba-agentes"
    query => "alias.keyword:%{[alias]}"
    fields => {
      "estado_agente" => "estado_agente_enriquecido"
    }
    user => "xxx"
    password => "xxxx"
    ca_file => 'xxx.pem'
  }

my pipelines.yml :

- pipeline.id: age-mod-pand
  path.config: "/apps/elastic/logstash/config/custom/Pruebas-mod-age/*"
  pipeline.workers: 1
  pipeline.order: true

the 2 filters (with input and output) :

[root@host]# ll /apps/elastic/logstash/config/custom/Pruebas-mod-age/
total 8
-rw-r--r-- 1 root root 2058 nov 18 16:07 A__agentes.conf
-rw-r--r-- 1 root root 2343 nov 18 16:07 B__modules.conf

If I first launch A and stop it and then launch B it works perfectly but when I put it in pipelines it shows the error indicating the field with which I want to enrich it does not exist ye
therefore I understand that logstash is executing the filters in parallel omitting the alphabetical order.

Thank you in advanced!

If you want to truly preserve the order of operations then use pipeline to pipeline communication

config/pipelines.yml

- pipeline.id: age-mod-pand-input
  #THIS EXECUTES FIRST
  path.config: "/apps/elastic/logstash/config/custom/Pruebas-mod-age/A__agentes.conf"
  pipeline.workers: 1
  pipeline.order: true
  

- pipeline.id: age-mod-pand-enrich
 #THIS EXECUTES WHENEVER AN EVENT IS SENT FROM THE age-mod-pand-input
  path.config: "/apps/elastic/logstash/config/custom/Pruebas-mod-age/B__modules.conf"
  pipeline.workers: 1
  pipeline.order: true

A__agentes.conf

input {
...
}
filter {
  elasticsearch {
    hosts => "https://xxxxxxxxx:9200"
    index => "prueba-agentes"
    query => "alias.keyword:%{[alias]}"
    fields => {
      "estado_agente" => "estado_agente_enriquecido"
    }
    user => "xxx"
    password => "xxxx"
    ca_file => 'xxx.pem'
  }
}
output{
     pipeline { send_to => age-mod-pand-enrich}
}

B__modules.conf

input {
     pipeline { address => age-mod-pand-enrich}
}
filter {
...
}
output {
...
}


First of all Thanks for replying.

This is very interesting that you tell me because I did not know that the interconnection of pipelines existed.

but I don't see how to apply it well.

I currently have 2 jdbc inputs that run every x time.

2 independent filters and in the second an enrichment to bring me the fields of an index

and 2 outputs to 2 indices.

If I use your model, how does it enrich since the index is not previously uploaded? How could the Elasticsearch filter know that such data exists?


filter {
  elasticsearch {
    hosts => "https://xxxxxxxxx:9200"
    # ------------> index => "prueba-agentes"  <-----------------
    query => "alias.keyword:%{[alias]}"
    fields => {
      "estado_agente" => "estado_agente_enriquecido"
    }
    user => "xxx"
    password => "xxxx"
    ca_file => 'xxx.pem'
  }
}

Could you give me an example of what you want this whole workflow to look like?

yes.

I have 2 indices.

Each of them has a jdbc oracle entry that fetches the data every 5 minutes.

I am trying to enrich one with fields from the other using the Elasticsearch filter. The problem is the following:

When I put it in pipelines they both run at the same time and therefore the enrichment does not complete.
This I have already seen that with workers 1 order etc it can be solved. The most important problem that I do not see how to solve is that logstash tries to enrich with everything and really should only do it with the last data that is originally indexed.

I only get it to work if I run the first ETL by hand and then the second.

Is there a way to do this enrichment with dynamic indexes taking only the last indexed documents?

This would be of great help to me because it is being repeated in different use cases, thank you.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.