How to connect multiple pipelines within the same Logstash instance running inside a docker container

Currently we have a data stream in elasticsearch, lets call it app_stream in which we are ingesting log messages from an application XMLTransformer through FileBeats(port=5044) installed on the server where the application is running. We have a new scenario where now we are receiving log messages on the same FileBeats port 5044 from two different applications running on the same server. Let's call another application Model Server. Now log messages from both the XMLTransformer and Model Server are going into app_stream. According to the requirement, we are suppose to create a new data stream ai_model_server which will only have log messages from Model Server whereas app_stream will only have log_messages from XML transformer. Also, we want to create two separate logstash pipeline for XMLTransformer and Model Server which will ingest the log messages from the same Filebeats port 5044 and we do not want to make any changes on the filebeats. So, we are trying to implement the Pipeline-to-Pipeline Communication to route the messages from app_stream.conf pipeline to another ai_model_server.conf pipeline. But the logstash instance is running inside a docker container. How to get the Virtual Address which is available in docker container and use it in the logstash pipeline.

The virtual address is user defined. It's just a way to identify what pipeline to send the data to. See the example here Pipeline-to-Pipeline Communication | Logstash Reference [7.13] | Elastic.

@legoguy1000
So If I configure logstash like the following, will it work perfectly for my use case ???
I will be giving "modelserverlogs" literally as an virtual address in configuration as you mentioned that it can be user-defined (my confusion was whether I could give anything in place of the virtual address in the config or do I need to get the virtual IP addresses available in logstash docker container and use them in the config )

======================
app_stream.conf

input {
beats {
port => 5044
type => beats
}
}
filter {
#XML transformer filter statements
add_tag => ["xml_transformer"]
}
output {
if "xml_transformer" in [tags] {
elasticsearch {
hosts => 0.0.0.0:0000
index => "app_stream"
}
}
else {
pipeline { send_to => modelserverlogs }
}
}

======================
ai_model_server.conf

input {
pipeline { address => modelserverlogs }
}
filter {
#modelserver log filter statements
}
output {
elasticsearch {
hosts => 0.0.0.0:0000
index => "ai_model_server"
}
}

=================
pipelines.yml

  • pipeline.id: app_stream
    path.config: "${LOGSTASH_BASE}/pipeline/app_stream.conf"
    queue.type: persisted
  • pipeline.id: ai_model_server
    path.config: "${LOGSTASH_BASE}/pipeline/ai_model_server.conf"
    queue.type: persisted

Next time you post code, use the code formatter so its easier to read. But looking at this, I believe it is correct and should work.

1 Like

@legoguy1000 It is working as expected. Thank you very much for your help.

No problem, can you please mark the solution?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.