Pipeline queue is getting filled and never able to control, using below drop filter plugin

I am trying to implement 2 pipelines, when one is about to get blocked, i am trying to drop events form that pipeline. So that the other one, which is half filled or healthy will keep receiving the events.

    1. Here in the below configuration i have es-host1 which is having 500MB queue size, and es-host2 which will eventually getting filled soon, since i kept the size with 50MB.
    1. So now when i flush logs of 2000/sec, i see that es-host2 is filling to its fullest first.
    1. When es-host1 is still not fully filled the output of es-host1 [eric-data-search-engine:9200] which also not receiving logs.
    1. I know that this is expected, according to design of logstash if any one of the queues is full, the others will be blocked.

But in my case, i am trying to flush the log event in pipeline queue which is about to fill in my below config

  if ["queueSize" > 30000000] {    // the size declared is 50MB but when it is reaching 30MB i am tryign to clear the log event by droping it
        drop { }
     }
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-configmap
  namespace: udp-test-poc
data:
  logstash.yml: |
    http.host: "0.0.0.0"
    http.port: 9600
  pipelines.yml: |
    - pipeline.id: intake
      queue.type: persisted
      queue.max_bytes: 500mb
      path.config: "/usr/share/logstash/config/logstash.conf"
    - pipeline.id: es-host1
      queue.type: persisted
      queue.max_bytes: 500mb
      path.config: "/usr/share/logstash/config/es1.conf"
    - pipeline.id: es-host2
      queue.type: persisted
      queue.max_bytes: 50mb
      path.config: "/usr/share/logstash/config/es2.conf"
  logstash.conf: |
    input {
      beats {
        port => 5044
      }
    }
    output {
      pipeline {
        send_to => ["es-host1" , "es-host2"]
      }
    }
  es1.conf: |
    input {
      pipeline {
        address => "es-host1"
      }
    }
    filter {
      http {
        url => "http://localhost:9600/_node/stats/pipelines/es-host1"
        target_body => "[@metadata][stats]"
        target_headers => "[@metadata][statsHeaders]"
        add_field => {"queueSize" => "%{[@metadata][stats][pipelines][es-host1][queue][queue_size_in_bytes]}" }
      }
      mutate {
        convert => {
          "queueSize" => "integer"
        }
       }
    }
    output {
      elasticsearch {
        ilm_enabled => false
        hosts => ["eric-data-search-engine:9200"]
        user => 'logstash'
        password => '${LOGSTASH_PW}'
        index => "logstash-beta-%{+YYYY.MM.dd}"
      }
    }

  es2.conf: |
    input {
      pipeline {
        address => "es-host2"
        address => "es-host2"
      }
    }
    filter {
      http {
        url => "http://localhost:9600/_node/stats/pipelines/es-host2"
        target_body => "[@metadata][stats]"
        target_headers => "[@metadata][statsHeaders]"
        add_field => {"queueSize" => "%{[@metadata][stats][pipelines][es-host2][queue][queue_size_in_bytes]}" }
      }
      mutate {
        convert => {
          "queueSize" => "integer"
        }
     }
     if ["queueSize" > 30000000] {
        drop { }
     }
    }
    output {
      elasticsearch {
        ilm_enabled => false
        hosts => ["ext-se:9200"]
        user => 'logstash'
        password => '${LOGSTASH_PW}'
        index => "logstash-beta-%{+YYYY.MM.dd}"
      }
    }

When i query logstash:9600, i get this below data

-------------------------------------------------
es2       "queue_size_in_bytes": 52431853,   // which grows more than 30000000
            "max_queue_size_in_bytes": 52428800   // 50MB of size
            
            
es1      "queue_size_in_bytes": 52196678,             // Still it has more size
            "max_queue_size_in_bytes": 524288000  //500MB of size
-------------------------------------------------

Here in my case, if I drop the events using the drop filter plugin, can i control the pipeline getting full. Or is my implementation on the filter plugin is wrong? please suggest.

That's an interesting approach. I cannot explain why it does not work. I just wanted to mention another approach that drops data described here.

@Badger I could see there is some limitations in UDP system, I was initially trying out this UDP system has limitation, where the log events with larger size are not able to get delivered.