RabbitMQ output resume after reaching queue limit

I'm trying to generate backpressure from RabbitMQ to Logstash. The behaviour that I'm after is for Logstash to stop publishing if the queue is full and resume publishing once it freed up some slots.

I've configured RabbitMQ queue to have a maximum depth of 1000 and set "x-overflow": "reject-publish" attribute which ensures that after reaching max queue depth, new publishes are rejected with a NACK from RabbitMQ. While testing, I observed that queue depth did not exceed 1000, however after manually removing some messages from the queue I did not observe Logstash resuming the publishing and refilling the queue.

According to some older threads (that I can not post links to :frowning: ) the behaviour is supposed to be supported in the latest Logstash. Any ideas what I am missing?

Here is the relevant Logstash and RabbitMQ config:
input {
  file {
    path => "/var/log/myproduct.log"
    start_position => "beginning"
  }
}

output {
  rabbitmq {
    exchange => "myproduct"
    exchange_type => "fanout"
    host => "rabbitmq"
    port => 5672
    persistent => true
    user => "guest"
    password => "guest"
    vhost => "/"
    codec => "json"
  }
}
"queues": [
        {
            "arguments": {
                "x-queue-type": "classic",
                "x-max-length": 1000,
                "x-overflow": "reject-publish"
            },
            "auto_delete": false,
            "durable": true,
            "name": "myproduct",
            "type": "classic",
            "vhost": "/"
        }
    ],
    "exchanges": [
        {
            "arguments": {},
            "auto_delete": false,
            "durable": true,
            "name": "myproduct",
            "type": "fanout",
            "vhost": "/"
        }
    ],
    "bindings": [
        {
            "arguments": {},
            "destination": "myproduct",
            "destination_type": "queue",
            "routing_key": "logstash",
            "source": "myproduct",
            "vhost": "/"
        }
    ]

RabbitMQ version: 3.12
Logstash version: 8.12.2

I've never used RabbitMQ, but if the queue is never filling then I would not expect backpressure to be engaged. When the output tries to publish a message it calls the gated executor. That is controlled here. The executor is here. If backpressure is engaged or released there should be an info level message.

Thanks for the pointers. I've double checked that I have INFO logging in the Logstash container:

    environment:
      - xpack.monitoring.enabled=false
      - LOG_LEVEL=info

yet I don't see any log messages about backpressure being activated. These are the logs I get at the info level:

INFO logs
logstash-1  | 2024/03/20 06:51:51 Setting 'xpack.monitoring.enabled' from environment.
logstash-1  | 2024/03/20 06:51:51 Setting 'log.level' from environment.
logstash-1  | Using bundled JDK: /usr/share/logstash/jdk
logstash-1  | /usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined; discarding old to_int
logstash-1  | /usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined; discarding old to_f
logstash-1  | Sending Logstash logs to /usr/share/logstash/logs which is now configured via log4j2.properties
logstash-1  | [2024-03-20T06:52:03,576][WARN ][deprecation.logstash.runner] NOTICE: Running Logstash as superuser is not recommended and won't be allowed in the future. Set 'allow_superuser' to 'false' to avoid startup errors in future releases.
logstash-1  | [2024-03-20T06:52:03,583][INFO ][logstash.runner          ] Log4j configuration path used is: /usr/share/logstash/config/log4j2.properties
logstash-1  | [2024-03-20T06:52:03,584][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"8.12.2", "jruby.version"=>"jruby 9.4.5.0 (3.1.4) 2023-11-02 1abae2700f OpenJDK 64-Bit Server VM 17.0.10+7 on 17.0.10+7 +indy +jit [x86_64-linux]"}
logstash-1  | [2024-03-20T06:52:03,585][INFO ][logstash.runner          ] JVM bootstrap flags: [-XX:+HeapDumpOnOutOfMemoryError, -Dlogstash.jackson.stream-read-constraints.max-number-length=10000, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, -Djruby.regexp.interruptible=true, --add-opens=java.base/java.security=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, -Dio.netty.allocator.maxOrder=11, -Dlog4j2.isThreadContextMapInheritable=true, -Xms1g, -Dlogstash.jackson.stream-read-constraints.max-string-length=200000000, -Djdk.io.File.enableADS=true, -Dfile.encoding=UTF-8, --add-opens=java.base/java.io=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, -Djruby.compile.invokedynamic=true, -Xmx1g, -Djava.security.egd=file:/dev/urandom, -Djava.awt.headless=true, -Dls.cgroup.cpuacct.path.override=/, -Dls.cgroup.cpu.path.override=/, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED]
logstash-1  | [2024-03-20T06:52:03,587][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-string-length` configured to `200000000`
logstash-1  | [2024-03-20T06:52:03,587][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-number-length` configured to `10000`
logstash-1  | [2024-03-20T06:52:03,593][INFO ][logstash.settings        ] Creating directory {:setting=>"path.queue", :path=>"/usr/share/logstash/data/queue"}
logstash-1  | [2024-03-20T06:52:03,595][INFO ][logstash.settings        ] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/usr/share/logstash/data/dead_letter_queue"}
logstash-1  | [2024-03-20T06:52:03,769][INFO ][logstash.agent           ] No persistent UUID file found. Generating new UUID {:uuid=>"0dbd11ae-c44d-4c81-b50c-796472497f17", :path=>"/usr/share/logstash/data/uuid"}
logstash-1  | [2024-03-20T06:52:04,282][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
rabbitmq-1  | 2024-03-20 06:52:04.394571+00:00 [info] <0.886.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
rabbitmq-1  | 2024-03-20 06:52:04.394839+00:00 [info] <0.886.0> Successfully synced tables from a peer
logstash-1  | [2024-03-20T06:52:04,620][INFO ][org.reflections.Reflections] Reflections took 91 ms to scan 1 urls, producing 132 keys and 468 values
logstash-1  | [2024-03-20T06:52:04,807][INFO ][logstash.codecs.json     ] ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)
logstash-1  | [2024-03-20T06:52:04,831][INFO ][logstash.javapipeline    ] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
rabbitmq-1  | 2024-03-20 06:52:04.877477+00:00 [info] <0.894.0> accepting AMQP connection <0.894.0> (10.22.99.3:52328 -> 10.22.99.2:5672)
rabbitmq-1  | 2024-03-20 06:52:04.898726+00:00 [info] <0.894.0> connection <0.894.0> (10.22.99.3:52328 -> 10.22.99.2:5672): user 'guest' authenticated and granted access to vhost '/'
logstash-1  | [2024-03-20T06:52:04,913][INFO ][logstash.outputs.rabbitmq][main] Connected to RabbitMQ {:url=>"amqp://guest:XXXXXX@localhost:5672/"}
logstash-1  | [2024-03-20T06:52:04,945][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>16, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>2000, "pipeline.sources"=>["/usr/share/logstash/pipeline/logstash.conf"], :thread=>"#<Thread:0x61765f71 /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:134 run>"}
logstash-1  | [2024-03-20T06:52:05,472][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.53}
logstash-1  | [2024-03-20T06:52:05,480][INFO ][logstash.inputs.file     ][main] No sincedb_path set, generating one based on the "path" setting {:sincedb_path=>"/usr/share/logstash/data/plugins/inputs/file/.sincedb_08cfe5e821a4884a8b77971020dcc599", :path=>["/var/log/myproduct.log"]}
logstash-1  | [2024-03-20T06:52:05,481][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
logstash-1  | [2024-03-20T06:52:05,486][INFO ][filewatch.observingtail  ][main][7ad47ad9b8977afed9528ba0b335f1a77be695b9c7380d30afa97c0b7c37656b] START, creating Discoverer, Watch with file and sincedb collections
logstash-1  | [2024-03-20T06:52:05,489][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}

if the queue is never filling then I would not expect backpressure to be engaged

Sorry, this was poor choice of words on my side. When i said:

I observed that queue depth did not exceed 1000

I actually meant that the queue filled up to the limit of 1000 and didn't go past it, suggesting that the "x-max-length": 1000, setting was actually working since I'm feeding logstash a file with 300K entries.

So where do I go from here?

I've created a Github issue but it doesn't look like anyone has seen/acknowledged it. What's the best way to engage with developers/maintainers?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.