I am trying to implement 2 pipelines, when one is about to get blocked, i am trying to drop events form that pipeline. So that the other one, which is half filled or healthy will keep receiving the events.
-
- Here in the below configuration i have es-host1 which is having 500MB queue size, and es-host2 which will eventually getting filled soon, since i kept the size with 50MB.
-
- So now when i flush logs of 2000/sec, i see that es-host2 is filling to its fullest first.
-
- When es-host1 is still not fully filled the output of es-host1 [eric-data-search-engine:9200] which also not receiving logs.
-
- I know that this is expected, according to design of logstash if any one of the queues is full, the others will be blocked.
But in my case, i am trying to flush the log event in pipeline queue which is about to fill in my below config
if ["queueSize" > 30000000] { // the size declared is 50MB but when it is reaching 30MB i am tryign to clear the log event by droping it
drop { }
}
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-configmap
namespace: udp-test-poc
data:
logstash.yml: |
http.host: "0.0.0.0"
http.port: 9600
pipelines.yml: |
- pipeline.id: intake
queue.type: persisted
queue.max_bytes: 500mb
path.config: "/usr/share/logstash/config/logstash.conf"
- pipeline.id: es-host1
queue.type: persisted
queue.max_bytes: 500mb
path.config: "/usr/share/logstash/config/es1.conf"
- pipeline.id: es-host2
queue.type: persisted
queue.max_bytes: 50mb
path.config: "/usr/share/logstash/config/es2.conf"
logstash.conf: |
input {
beats {
port => 5044
}
}
output {
pipeline {
send_to => ["es-host1" , "es-host2"]
}
}
es1.conf: |
input {
pipeline {
address => "es-host1"
}
}
filter {
http {
url => "http://localhost:9600/_node/stats/pipelines/es-host1"
target_body => "[@metadata][stats]"
target_headers => "[@metadata][statsHeaders]"
add_field => {"queueSize" => "%{[@metadata][stats][pipelines][es-host1][queue][queue_size_in_bytes]}" }
}
mutate {
convert => {
"queueSize" => "integer"
}
}
}
output {
elasticsearch {
ilm_enabled => false
hosts => ["eric-data-search-engine:9200"]
user => 'logstash'
password => '${LOGSTASH_PW}'
index => "logstash-beta-%{+YYYY.MM.dd}"
}
}
es2.conf: |
input {
pipeline {
address => "es-host2"
address => "es-host2"
}
}
filter {
http {
url => "http://localhost:9600/_node/stats/pipelines/es-host2"
target_body => "[@metadata][stats]"
target_headers => "[@metadata][statsHeaders]"
add_field => {"queueSize" => "%{[@metadata][stats][pipelines][es-host2][queue][queue_size_in_bytes]}" }
}
mutate {
convert => {
"queueSize" => "integer"
}
}
if ["queueSize" > 30000000] {
drop { }
}
}
output {
elasticsearch {
ilm_enabled => false
hosts => ["ext-se:9200"]
user => 'logstash'
password => '${LOGSTASH_PW}'
index => "logstash-beta-%{+YYYY.MM.dd}"
}
}
When i query logstash:9600, i get this below data
-------------------------------------------------
es2 "queue_size_in_bytes": 52431853, // which grows more than 30000000
"max_queue_size_in_bytes": 52428800 // 50MB of size
es1 "queue_size_in_bytes": 52196678, // Still it has more size
"max_queue_size_in_bytes": 524288000 //500MB of size
-------------------------------------------------
Here in my case, if I drop the events using the drop filter plugin, can i control the pipeline getting full. Or is my implementation on the filter plugin is wrong? please suggest.