Pipeline queue is getting filled and never able to control, using below drop filter plugin

I am trying to implement 2 pipelines, when one is about to get blocked, i am trying to drop events form that pipeline. So that the other one, which is half filled or healthy will keep receiving the events.

    1. Here in the below configuration i have es-host1 which is having 500MB queue size, and es-host2 which will eventually getting filled soon, since i kept the size with 50MB.
    1. So now when i flush logs of 2000/sec, i see that es-host2 is filling to its fullest first.
    1. When es-host1 is still not fully filled the output of es-host1 [eric-data-search-engine:9200] which also not receiving logs.
    1. I know that this is expected, according to design of logstash if any one of the queues is full, the others will be blocked.

But in my case, i am trying to flush the log event in pipeline queue which is about to fill in my below config

  if ["queueSize" > 30000000] {    // the size declared is 50MB but when it is reaching 30MB i am tryign to clear the log event by droping it
        drop { }
     }
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-configmap
  namespace: udp-test-poc
data:
  logstash.yml: |
    http.host: "0.0.0.0"
    http.port: 9600
  pipelines.yml: |
    - pipeline.id: intake
      queue.type: persisted
      queue.max_bytes: 500mb
      path.config: "/usr/share/logstash/config/logstash.conf"
    - pipeline.id: es-host1
      queue.type: persisted
      queue.max_bytes: 500mb
      path.config: "/usr/share/logstash/config/es1.conf"
    - pipeline.id: es-host2
      queue.type: persisted
      queue.max_bytes: 50mb
      path.config: "/usr/share/logstash/config/es2.conf"
  logstash.conf: |
    input {
      beats {
        port => 5044
      }
    }
    output {
      pipeline {
        send_to => ["es-host1" , "es-host2"]
      }
    }
  es1.conf: |
    input {
      pipeline {
        address => "es-host1"
      }
    }
    filter {
      http {
        url => "http://localhost:9600/_node/stats/pipelines/es-host1"
        target_body => "[@metadata][stats]"
        target_headers => "[@metadata][statsHeaders]"
        add_field => {"queueSize" => "%{[@metadata][stats][pipelines][es-host1][queue][queue_size_in_bytes]}" }
      }
      mutate {
        convert => {
          "queueSize" => "integer"
        }
       }
    }
    output {
      elasticsearch {
        ilm_enabled => false
        hosts => ["eric-data-search-engine:9200"]
        user => 'logstash'
        password => '${LOGSTASH_PW}'
        index => "logstash-beta-%{+YYYY.MM.dd}"
      }
    }

  es2.conf: |
    input {
      pipeline {
        address => "es-host2"
        address => "es-host2"
      }
    }
    filter {
      http {
        url => "http://localhost:9600/_node/stats/pipelines/es-host2"
        target_body => "[@metadata][stats]"
        target_headers => "[@metadata][statsHeaders]"
        add_field => {"queueSize" => "%{[@metadata][stats][pipelines][es-host2][queue][queue_size_in_bytes]}" }
      }
      mutate {
        convert => {
          "queueSize" => "integer"
        }
     }
     if ["queueSize" > 30000000] {
        drop { }
     }
    }
    output {
      elasticsearch {
        ilm_enabled => false
        hosts => ["ext-se:9200"]
        user => 'logstash'
        password => '${LOGSTASH_PW}'
        index => "logstash-beta-%{+YYYY.MM.dd}"
      }
    }

When i query logstash:9600, i get this below data

-------------------------------------------------
es2       "queue_size_in_bytes": 52431853,   // which grows more than 30000000
            "max_queue_size_in_bytes": 52428800   // 50MB of size
            
            
es1      "queue_size_in_bytes": 52196678,             // Still it has more size
            "max_queue_size_in_bytes": 524288000  //500MB of size
-------------------------------------------------

Here in my case, if I drop the events using the drop filter plugin, can i control the pipeline getting full. Or is my implementation on the filter plugin is wrong? please suggest.

That's an interesting approach. I cannot explain why it does not work. I just wanted to mention another approach that drops data described here.

@Badger I could see there is some limitations in UDP system, I was initially trying out this UDP system has limitation, where the log events with larger size are not able to get delivered.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.