Unable to consume messages from kafka using logstash at a high throughput

Hi I am currently load testing my logstash to consume events from kafka and index to opensearch. In our production environment we get around 300k events per seco
nd so I was trying to replicate the same. I was able to produce 300k events with each record size as 2KB but have problem in consuming those events from kafka. Below is my logstash deployment file along with the configmap for the pipeline, where each logstash pod is running on a separate instance. I have 10 partitions for my kafka topic. So I am using 10 instances of logstash with consumer_threads=1 to consume events from each partition. Below is my infrastructure details related to kafka and logstash

Kafka: m6g.2xlarge ( 8 vpcus and 32 GB RAM)
logstash: m6g.xlarge (4vcpus and 16GB RAM)
Persistentence for kafka: EBS volume with type GP3 (3000 IOPS and 500 MB/s throughput).

However with above setup I was only able to consume 76-78k messages per second even though using separate logstash pod for each partition. As I am load testing I added some basic filtering and discarding the events using null. But in production we will be having multiple filters and multiple pipelines to send to different outputs (opensearch,aws etc).

Firstly I tried every configuration w.r.t kafka consumer_configs like ( fetch.min.bytes, fetch.max.bytes, max.partition.fetch.bytes etc) to try to achieve higher throughput but even though I increase those values the throughput remains the same. I also checked the CPU and RAM utilization of my kafka brokers but didn't see any issue or an abnormal spike. Along with this I also monitored the RAM and CPU usage of logstash pods which is very less around 14-16% cpu utilisation ( 1.5-2 cpu and 1.8-2 GB of RAM). Also I don't see the network bandwidth has the bottleneck as the instance provides upto 10Gbps of network bandwidth.
Also I monitored the IOPS and throughput of EBS volume and observed that disk read rate is much lower than the disk write rate.


I guess it has something to do with logstash configuration due to which it is unable to consume events at the rate which it is producing the events. Can anyone help in fine tuning the logstash/kafka properties to achieve high throughput

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: logstash
  namespace: kafka
  labels:
    app: logstash
spec:
  replicas: 10
  selector:
    matchLabels:
      app: logstash
  template:
    metadata:
      labels:
        app: logstash
    spec:
      affinity:
        podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: app
                    operator: In
                    values:
                      - logstash
              topologyKey: "kubernetes.io/hostname"
      tolerations:
        - key: dedicated-for
          value: logstash
          effect: "NoExecute"
      containers:
      - name: logstash
        image: docker.elastic.co/logstash/logstash:8.3.3
        resources:
          requests:
            cpu: 1000m
            memory: 2Gi
          limits:
            memory: 14Gi
            cpu: 4
        env:
        - name: KAFKA_TOPIC
          value: "kafka-test-topic"
        - name: KAFKA_BROKER
          value: "kafka-cluster.kafka.svc.cluster.local:9092"
        - name: XPACK_MONITORING_ENABLED
          value: "false"
        - name: config.reload.automatic
          value: "true"
        - name: pipeline.workers
          value: "8"
        - name: pipeline.batch.size
          value: "5000"
        - name: LS_JAVA_OPTS
          value: "-Xms12g -Xmx12g"
        volumeMounts:
        - name: logstash-pipeline
          mountPath: /usr/share/logstash/pipeline
      volumes:
      - name: logstash-pipeline
        configMap:
          name: logstash-configmap
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-configmap
  namespace: kafka
  labels:
    app: logstash
data:
  pipeline.conf: |
    input {
      kafka {
        bootstrap_servers => "${KAFKA_BROKER}"
        topics => ["${KAFKA_TOPIC}"]
        group_id => "consumer-logstash-group"
        codec => "plain"
        max_poll_records => "60000"
        consumer_threads => "1"
        fetch_max_bytes => "314572800"
        fetch_min_bytes => "52428800"
        max_partition_fetch_bytes => "10485760"
        auto_offset_reset => "earliest"
      }
    }
    filter {
      mutate {
        remove_field => ["[event][original]"]
      }
    }
    output {
      null {}
    }
---

FYI the throughput remained the same with using lesser no of logstash pods for example I used 5 replicas of logstash with consumer_threads => 2 the throughput remained the same.

Because you're using Opensearch, it'd be better to open a topic in https://forum.opensearch.org/

I'm not sure, but I would not rule out network, those instances types are marked as Up to 10 Gbps, they have a smaller baseline bandwith of 1. 25 Gbps (m6g.xlarge) and 2.5 Gbps (m6g.2xlarge) as you can check here, the 10 Gbps is the burst bandwidth that can be reached depending on other factors and it is not sustained for a long time as described here.

Instances can use burst bandwidth for a limited time, typically from 5 to 60 minutes, depending on the instance size.

How many hosts of each type do you have?

If my numbers are right, 78k events per second with 2k bytes per event, will result in something close to 150 MB/s, which would be pretty close of the baseline bandwith of the logstash instance.

@leandrojmp thanks for response. Currently I have 3 kafka brokers which are running on (m6g.2xlarge) and I have 10 logstash pods each running on a separate ec2 instance of type (m6g.xlarge). As you pointed out 78k EPS where each record is of size 2KB (2048 bytes) uses close to 152 MB/s (1.2 Gbps) which is close to 1.25 Gbps of network bandwidth. But the question is even though logstash is hitting it's max baseline network bandwidth and let's assume there no network I/O credits available for bursting the network bandwidth, Since each logstash is running on a separate ec2 instance of type (m6g.xlarge) ideally it should able to read more events right? Basically I have 10 ec2 instances for my logstash so the collective available bandwidth would be 10x1.25 Gbps=12.5 Gbps. But my collective throughput when using 10 logstash pods is 78k which is every less. Do you suspect the kafka brokers might also be hitting the baseline network bandwidth while consuming events? Just to confirm do you see anything wrong in my logstash/kafka consumer configs just to narrow down if the issue is related to network bandwidth?