Pipeline-to-Pipeline Configuration Does Not Work with TLS Cluster

It appears that both pipelines, that beats pipeline forwards to, are receiving all the documents. As a result, the Logstash instance throws all kinds of validation errors. Eventually, some of the documents would make it through the madness and get some of its fields updated by both receiver pipelines.

Leaving one pipeline running with beats pipeline re-directing to it leaves a false impression it's all working, where actually the distributor pattern is not functional at all but it acts as a forked path one instead.

For this test I used Logstash on Kubernetes.
pipelienes.yml:

- pipeline.id: beats-server
  path.config: "/usr/share/logstash/pipeline/beats-server.conf"
- pipeline.id: receiver1
  path.config: "/usr/share/logstash/pipeline/receiver1.conf"
- pipeline.id: receiver2
  path.config: "/usr/share/logstash/pipeline/receiver2.conf"

beats-server.conf:

input {
  beats {
    port => "5044"
    ssl => true
    ssl_key => '/etc/logstash/certificates/input/tls.key'
    ssl_certificate => '/etc/logstash/certificates/input/tls.crt'
    ssl_certificate_authorities => '/etc/logstash/certificates/input/ca.crt'
  }
}
output {
  if "receiver1" in [tags] {
    pipeline { send_to => receiver1address }
  } else if "receiver2" in [tags] {
    pipeline { send_to => receiver2address }
  }
}

receiver1.conf:

input { 
  pipeline { 
    address => receiver1address
  }
}

filter {
  # Do some processing here
}

output {
  if ("_jsonparsefailure" in [tags]) {
    elasticsearch {
      hosts => ["XXX"]
      index => "failure"
      document_id => "receiver1"
      ssl => true
      cacert => "/etc/logstash/certificates/output/ca.crt"
      user => "XXX"
      password => "XXX"
    }
  } else {
    elasticsearch {
      hosts => ["XXX"]
      index => "receiver1"
      document_id => "XXX"
      ssl => true
      cacert => "/etc/logstash/certificates/output/ca.crt"
      user => "elastic"
      password => "XXX"
    }
  }
}

receiver2.conf:

input { 
  pipeline { 
    address => receiver2address
  }
}

filter {
  # Do some processing here
}

output {
  if ("_xmlparsefailure" in [tags]) {
    elasticsearch {
      hosts => ["XXX"]
      index => "failure"
      document_id => "receiver2"
      ssl => true
      cacert => "/etc/logstash/certificates/output/ca.crt"
      user => "XXX"
      password => "XXX"
    }
  } else {
    elasticsearch {
      hosts => ["XXX"]
      index => "receiver2"
      document_id => "XXX"
      ssl => true
      cacert => "/etc/logstash/certificates/output/ca.crt"
      user => "elastic"
      password => "XXX"
    }
  }
}

On filebeat side I have tried many different iterations:

  • Using tags as part of processors (add_tags: tags: [value])
  • Using tags as part of input (tags: [value])
  • Using custom fields as part of input (fields: doctype: value)
  • I can't use type as all cases are log

Logstash doesn't seem to like the syntax at all. In most of the cases it throws errors like below:

[DEBUG] 2020-10-30 13:49:57.710 [nioEventLoopGroup-2-2] ConnectionHandler - 51a29330: batches pending: true
[DEBUG] 2020-10-30 13:49:57.710 [nioEventLoopGroup-2-2] ConnectionHandler - 51a29330: batches pending: true
[DEBUG] 2020-10-30 13:49:57.739 [nioEventLoopGroup-2-2] ConnectionHandler - 51a29330: batches pending: true

which blocks the sending of data to the ES cluster.

I have followed the documentation, went through similar issues in this forum, and implemented all recommendations. Unfortunately, this doesn't seem to work.