Finding bottleneck in pipeline

Hello,

I'm looking for brave and clever people who can help out someone in a pinch.

I've been struggling to find out why one of my (very busy) index is lagging behind. I've been reading different posts, suggestions, etc. I have enough resources and tried to optimize everything the best I could. Unfortunately, I can't go around testing all levers one by one due to the sheer number of events, and replicating the environment would cost $$$$.
When I stop sending data to the message queue it takes a lot of time until the last event is indexed, depending on how long was it running. As time passes, the index is getting more and more behind the current time.

The pipeline goes like this:

  1. A couple of Filebeat instances are forwarding JSON events from log files to a Kafka cluster.
  2. Logstash instances are pulling data from the message queue, doing some pre-processing, and sending batches to the Elasticsearch cluster.
  3. Elasticsearch nodes are indexing the events.

All VMs are in the same Google data center (so network latency is absolutely out of question).
Each event is relatively short (HTTP log-like) around 1100-1300 characters long. However, depending on some factors there are 20K-55K events every second (most of the time in the 40K-50K range).
I have multiple pipelines (30+) but those are a lot quieter. I also saw that the ES nodes can perform better (+20%) in the busiest periods.

That's why I think that either Logstash can't pull data at an optimal rate from Kafka or can't send enough batches to Elasticsearch. However, I can't figure out what can I change to have the nodes working harder. Obviously, I don't want to throw more hardware at the issue, since IMO there is a lot of wiggle room already.

I'll describe the most relevant information of the affected clusters, let me know if I left out anything.

Elasticsearch

version: 7.9.2
Multiple ES VMs in the cluster with an extra coordinating node. Currently, there aren't any search requests sent for the indices in question.
Data node resources: 32 vCPUs, 120 GB of memory, multiple SSDs

Metrics:

  • heap around 70% (as it's set)
  • CPU 40-50%
  • write throughput ~20%, write IOPS < 10%
  • read throughput ~ 4%, read IOPS < 10%

Index settings:

  • settings.index.codec: "best_compression"
  • settings.index.refresh_interval : "60s",
  • settings.index.number_of_shards : "30",
  • settings.index.number_of_replicas : "0"

Index mapping:

  • 80 fields
  • no dynamic types/regex/extra analyzer

Logstash:

version: 7.9.2
Multiple Logstash VMs are connecting to Kafka and the ES cluster.
Node resources: 16 vCPUs, 14.4 GB of memory.

Metrics:

  • heap around 70% (as it's set)
  • CPU 50-70%
  • almost nonexistent I/O operations

Kibana shows 0.3 ms/e for the current version of the Logstash configuration file.

Pipeline settings:
pipeline.workers: 6
pipeline.batch.size: 1000
pipeline.ordered: false

Logstash queue setting:
queue.type: persisted
queue.drain: true
queue.max_bytes: 72mb

Kafka:

Metrics:

  • CPU 40-55%
  • almost nonexistent I/O operations

Topic setting:

  • Logstash nodes*6 partition
  • replication factor: 2

ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 3000
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = [ ]
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 50
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retry.backoff.ms = 100
send.buffer.bytes = 131072
session.timeout.ms = 10000
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

I'm passionately waiting for any help that would elevate the indexing rate.

This specs for elasticsearch are per-node? What do you mean by multiple SSDs? Can you share the specs per-node and say how many nodes do you have?

Also, how many index and shards do you have in total in your cluster?

Can you share the full logstash pipeline of the index that has problems?

Yep, it's per node, I've plenty of resources, I'm 99.9% sure that that isn't the problem. IMO, that's why it's irrelevant how many nodes are in the cluster or what are the other specs. Every node has roughly the same workload.
There are multiple 2TB SSDs that are mounted to the VM, that way I can provide the best I/O rate.
There are a couple of hundred indices (only 30+ are written, the rest are kept around for a couple of days) and ~1.500 shards. However, that doesn't really matter, as I said, ES shows that there is still at least 20% more "capacity" for indexing. Without that one daily index, I have roughly the same amount of indices and shards. If I stop the pipeline in question, the indexing rate drops to ~50%.

Can you share the full logstash pipeline of the index that has problems?

Do you mean the LS configuration? If so:

input {
  kafka {
    id => "REDACTED"
    client_id => "REDACTED"
    group_id => "REDACTED"
    codec => "json"
    topics => ["REDACTED"]
    bootstrap_servers => "REDACTED"
    ## REDACTED security settings ##
  }
}

filter {
  json {
    source => "message"
  }
  date {
    match => ["[REDACTED]", "ISO8601"]
  }
  mutate {
    add_tag =>  [REDACTED]
    remove_field => [REDACTED]
    remove_tag => [REDACTED]
  }
  ruby {
      code => '
          event.to_hash.each { |k, v|
              if v.kind_of? String
                  if v == "" || v == "unknown"
                      event.remove(k)
                  end
              end
              if v.kind_of?(Array)
                  if v.empty?
                      event.remove(k)
                  end
              end
          }
      '
  }

  if [REDACTED] {
    if [REDACTED] == REDACTED {
      mutate { add_field => { "REDACTED" } }
    } else if [REDACTED] == REDACTED {
      mutate { add_field => { "REDACTED" } }
    } else {
      mutate { add_field => { "REDACTED" } }
      mutate { add_tag =>  ["REDACTED"] }
    }
  } else {
    mutate { add_field => { "REDACTED" } }
    mutate { add_tag =>  ["REDACTED"] }
  }

  if [REDACTED] {
   ruby {
      code => "
      ets=event.get('REDACTED').to_i;
      ets=(ets / 1000000).round;
      event.set('REDACTED', ets);
      "
   }
  }
  if "REDACTED" not in [REDACTED] {
      mutate { remove_field => ["REDACTED"] }
  }

}

output {
  elasticsearch {
    hosts => [REDACTED]
    index => "REDACTED"
    ## REDACTED security settings ##
  }
}

But again, Logstash nodes have free resources that make me think that it's:
1, a setting that prevents grabbing more data
2, a setting that prevents sending more data to the ES data nodes
3, Logstash itself can't process more events even if I had 10* more resources

Of course, I can be wrong about stuff at the start of this message and at the end too. :slight_smile:
If you can describe your concerns I may be able to provide more data but I'm working with this stack for some time and I feel that this isn't related to the available resources. Though I don't say that I can't be mistaken :man_shrugging: :smile:

Well, It is not easy to find the cause of a bottleneck, so until any hint of what the cause can be I would say that nothing is totally irrelevant.

It is pretty common to think that Logstash is the cause of the bottleneck just to find in the end that it is Elasticsearch for some particular reason, that's why it is important to know the specs of each nodes and the number of nodes and indices/shards as this can impact in the overall performance of your cluster, both indexing and searching.

For example, you said that your nodes have 32 vCPUs, 120 GB of memory and multiple SSDs.

If you are using 70% of your memory to the Heap of Elastiscearch on a 120 GB RAM node, this will give you something near 84 GB of Heap, which can be too much, there is a recommendation to try keep the heap bellow ~ 30 GB because of some technical limitation of the JVM.

Also, you said that the VMs have multiple SSDs, how are you configuring the path.data in your elasticsearch.yml? Are you using multiple data paths? Are you using some kind of RAID? Can you share your elasticsearch.yml.

Knowing the number of nodes and their specs is something that helps.

What is the size of your index? There is also a recommendation to keep the size of the shards around tens of GB, something close to 40 GB, 50 GB, so with 30 shards your index would be something like 1.5 TB? Depending on the number of nodes you can have a oversharded cluster which can also impact the overall performance.

From your Logstash pipeline I didn't see anything that could be slowing it down, while I do not use ruby code in my pipelines because it can slow down sometimes, I don't think this is the case.

But with 16 vCPUs, you could try to give more workers to that pipeline to see if something changes, this would be the first test.

How many Logstash nodes are consuming from that kafka topic and how many partitions does the topic have? You could also try to adjust the consumer_threads in the kafka input to see if it helps also.

Do you have a separated monitoring cluster to get the metrics for your production cluster or are you using some other tool to monitor the index rate?

Thank you for elaborating. Yes, I know all these recommendations, I did my homework during the past years, that's why I'm here. :slight_smile:

For example, you said that your nodes have 32 vCPUs, 120 GB of memory and multiple SSDs.

In this ES cluster, there are 9 data nodes all of the are "n1-standard-32". Also, there are 8 Logstash nodes that are "n1-highcpu-16" type of VMs and have the exact same setup.
GCP machine specs: Familia de máquinas de uso general para Compute Engine  |  Documentación de Compute Engine  |  Google Cloud
I wasn't specific about the heap memory. The total is 30 GB (per node) and the usage of that is ~70% without any worrisome spikes.

Can you share your elasticsearch.yml .

Elasticsearch config:

cluster.name: REDACTED
node.name: REDACTED
path.data:
  - /elasticsearch/disk_sdb1/esdata
  - /elasticsearch/disk_sdc1/esdata
  - /elasticsearch/disk_sdd1/esdata
  - /elasticsearch/disk_sde1/esdata
  - /elasticsearch/disk_sdf1/esdata
  - /elasticsearch/disk_sdg1/esdata
  - /elasticsearch/disk_sdh1/esdata
path.logs: /var/log/elasticsearch
network.host: REDACTED
discovery.seed_hosts: REDACTED
cluster.initial_master_nodes: REDACTED

reindex.remote.whitelist: REDACTED

#custom memory limits
indices.queries.cache.size: "1gb"
indices.memory.max_index_buffer_size: "6gb"
indices.fielddata.cache.size: "2gb"

xpack.ml.enabled: false

## REDACTED xpack.security settings for transport and http 

What is the size of your index?

Yes, daily indices are about 1.5TB which has 30 shards and that's on 9 separate VMs. I'm confident that it's not an issue. In the past, I had an inflated index where I didn't realize that individual shards grew well over 100GB while nothing changed (indexing, query response time, etc.).

But with 16 vCPUs, you could try to give more workers to that pipeline to see if something changes, this would be the first test.

How many Logstash nodes are consuming from that Kafka topic and how many partitions does the topic have? You could also try to adjust the consumer_threads in the kafka input to see if it helps also.

Yeah, more workers (CPU cores) are something to consider but if that'd be an issue then the processors would be a lot more utilized. Somewhat the same with the threads though I think I didn't use that setting. Naturally, I'll check if changing those will have any effect.

Do you have a separated monitoring cluster to get the metrics for your production cluster or are you using some other tool to monitor the index rate?

I use self-monitoring without any issue. But I extract resource info from GCP which should be more accurate.

I hope, I answered all your questions :slight_smile:
I'll be able to check changes next week.
Have a great weekend!

also You can review timing of particular events over request directly to instance

curl -XGET 'localhost:9600/_node/stats/pipelines?pretty'

Your data nodes specs are pretty good, with 120 GB of RAM and 30 GB of heap, you could even save money using machines with 64 GB.

What is the SSD type? GCP has many disks typse that are backed by SSD, but I agree with you, I don't think it is your specs.

One thing that you should change as soon as possible, and can be a huge performance hit, is the self-monitoring, I had a similar issue a couple months ago and moving the monitoring to a separated cluster solved the problem.

Other thing that you should change in the future is the use of multiple data paths, I'm not sure if this can impact your performance this way, but multiple data paths are deprecated and will be removed on some next version, but I think that the removal got pushed to 9.0.

This issue makes clear that multiple data paths are not well-tested nor maintained.

At this moment I would test disabling the self-monitoring to see if the performance improves.

Thank you for your input!
I just returned from my time off, I'll dive into this again and update this thread.
Do you have any recommendations for a monitoring cluster VM-wise?

@leandrojmp

I'd appreciate some pointers (I'm good with links to proper docs/blogs/etc) regarding the separate monitoring cluster.

multiple data paths are deprecated and will be removed on some next version

Uhh... That's bad. Each path points to a mounted drive with the maximum size where GCP provides the best I/O rate. If those are removed it'll badly impact my current setup. Naturally, if I'd have a hot-warm-cold setup it'd be a lot easier but that's not that easy when you have tens of TBs of data :grimacing:
I had no problems with those in the past 3 years, but thanks for the info!

In the end, I didn't disable monitoring (but I'm keeping that in mind) because when I added the consumer_threads in the configuration file in question (matching the number of pipeline.workers) I saw a huge boost. I don't really understand why is that, but it's enough for me right now. I tried to tweak some other options too but it seems nothing I do makes a real difference. TBH, I'd need to perform too many tests (and create a dev env with a ton of VMs) to see what else can be changed to get out the maximum from my setup, so I'll leave things as-is for now :slight_smile:

Thank you for your tips and time!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.