Hello there,
I did hit an issue with consuming pubsub messages with most of the 8.x versions of logstash. I tried to rule out every single point of the pipeline, it seems logstash simply does not ingests data fast enough through the pubsub input.
A quick rundown of our test environment:
A random service putting logs into pubsub topic through log router on GCP. On the topic there is a subscription preconfigured for ingestion, referenced in logstash as an input, output is elasticsearch (v7.17.27) directly. I tried it with kafka output as well, same issue, the pipeline is only a pubsub input and an elasticsearch output now without any additional filtering rule in between.
With anything from 7.17.x we tried up to 8.1.3 it works, and with one container we can do 6-11k events per seconds with our test configuration (might be possible to do more, I did not check the maximum we can reach). From 8.2.0 to 8.4.0 it had an issue, most likely this one was the solution for that. However, since 8.4.0 every single version with the exact same configuration caps around 100-150 event per seconds with one container. Previously with 15 containers we could reach 100-150k/sec throughput without changing the config no matter what output we choose (again we did not test the maximum capacity of the pipeline, but based on metrics that was the biggest sustained traffic we had so far). After upgrading to the newer versions with 15 containers it caps at just below 2k/sec and the lag in pubsub is increasing constantly. That incredibly large change in throughput makes it impossible to use anything newer than 8.1.3 in any real world application with a pubsub input.
For reference all the configurations I used and a sample docker compose file to start the service anywhere and try to reproduce it. (you might need an additional proxy address if your docker cannot see the internet directly, I omitted that from the sample for the sake of clarity)
docker-compose.yml
version: '3.9'
# needed env variables to start:
# PUBSUB_PROJECT
# PUBSUB_TOPIC
# PUBSUB_SUBSCRIPTION
# PUBSUB_KEY_FILENAME
# ES_HOST
# ES_USER
# ES_PASSWORD
# be sure to move the credentials json into the pipeline folder to test or change it in the pipeline config to get if from elsewhere
services:
gap-stage-test:
# including a working and a capped image to speed up switching between them
#image: docker.elastic.co/logstash/logstash-oss:8.17.1
image: docker.elastic.co/logstash/logstash-oss:8.1.3
# runtime installing plugin instead of rebuilding image
command: ["/bin/bash", "-c", "bin/logstash-plugin install logstash-input-google_pubsub && bin/logstash"]
hostname: '{{.Service.Name}}-{{.Task.Slot}}'
healthcheck:
test: "/usr/bin/test \"$$(/usr/bin/curl -m 3 -s -o /dev/null -w '%{http_code}' http://localhost:9600)\" -eq 200"
interval: 15s
start_period: 120s
environment:
#LOG_LEVEL: "debug"
# generic settings for jvm and pipelines
LS_JAVA_OPTS: "-Xms2g -Xmx2g -Dlog4j2.formatMsgNoLookups=true"
PIPELINE_BATCH_DELAY: 2
PIPELINE_BATCH_SIZE: 100
PIPELINE_WORKERS: 4
# pubsub input details
PUBSUB_KEY_FILENAME: "${PUBSUB_KEY_FILENAME}"
PUBSUB_PROJECT: "${PUBSUB_PROJECT}"
PUBSUB_TOPIC: "${PUBSUB_TOPIC}"
PUBSUB_SUBSCRIPTION: "${PUBSUB_SUBSCRIPTION}"
# Elasticsearch connection details
ES_HOST: "${ES_HOST}"
ES_PASSWORD: "${ES_PASSWORD}"
ES_USER: "${ES_USER}"
# it's the default value in config, but putting it here for clarity
ES_INDEX: logstash-test
deploy:
mode: replicated
replicas: 1
volumes:
- type: bind
source: ./config/test_pipelines.yml
target: /usr/share/logstash/config/pipelines.yml
- type: bind
source: ./pipeline
target: /usr/share/logstash/pipeline
test_pipelines.yml
---
- pipeline.id: test
path.config: /usr/share/logstash/pipeline/test.conf
pipeline.ecs_compatibility: disabled
test.conf
input {
google_pubsub {
project_id => "${PUBSUB_PROJECT}"
topic => "${PUBSUB_TOPIC}"
subscription => "${PUBSUB_SUBSCRIPTION}"
json_key_file => "/usr/share/logstash/pipeline/${PUBSUB_KEY_FILENAME}"
create_subscription => false
max_messages => 50
}
}
output {
# pubsub has at least once delivery, we are tring to deduplicate messages with this, if you feel this causes issues just leave the else part's content in
if [@metadata][insert_id] {
elasticsearch {
id => "elastic:insert_id"
hosts => ["${ES_HOST}"]
user => "${ES_USER:logstash}"
password => "${ES_PASSWORD:password}"
index => "${ES_INDEX:logstash-test}"
action => "create"
document_id => "%{[@metadata][insert_id]}"
}
} else {
elasticsearch {
id => "elastic:main"
hosts => ["${ES_HOST}"]
user => "${ES_USER:logstash}"
password => "${ES_PASSWORD:password}"
index => "${ES_INDEX:logstash-test}"
action => "create"
}
}
}
Does anyone had the same experience?
Is this a known issue? I could not find any mentions about this so far. Or is it a documented default value change I'm not aware of?
Thanks in advance!