Increasing Logstash Throughput When the Codec is the Bottleneck

Setup

I have multiple machines running a containerized Logstash 7.6. These machines have 32 core and 256GB RAM. I am using these nodes to process network flow data based on Elastiflow. These nodes are listening for data using the Sflow codec and pushing data to an ES cluster of 32 nodes, each with the same spec as above, but including 12TB SSD. I also have samplicator sitting between these logstash instances and the devices sending data to them. I know samplicator is not dropping data because I can see other downstream devices are receiving & processing all of this data.
The data flow looks like this:

node-level-dataflow:

<network devices> -> <LOGSTASH_SERVER_RUNS_SAMPLICATOR_&_LOGSTASH> -> <elasticsearch>

service-level-dataflow:

<network devices> -> <samplicator> -> <sflow-codec>  -> <logstash> -> <elasticsearch>
                                                ^
                                   (dropped packets here)

Problem

With this setup, I'm able to process Netflow data, however it's recently came to my attention that specifically the sflow codec is dropping ~75-90% of the packets it receives. To isolate where in the pipeline this issue is occurring.

Testing

I isolated the sflow codec to simply listen to data being pushed from samplicator and write to a file - even when just using the codec and not the rest of Logstash, I'm still seeing similar packet loss.

I've also modified the number of pipeline workers from 32 to 128, and see no real improvement.

Question

What can I do to improve the performance of Logstash without having to dig in to the codec? I'm not familiar enough with Ruby to even know if it could use optimization.
I was considering either setting up multiple logstash containers on every physical machine, or otherwise multiple pipelines that process the same data...?

Addtional config:

logstash.yml

$ cat logstash.yml | grep -v '#'
pipeline.id: elastiflow
pipeline.workers: ${PIPELINE_NUM_WORKERS:32}
pipeline.batch.size: 512
pipeline.batch.delay: 250

dead_letter_queue.enable: true
path.dead_letter_queue: "${LS_BAD_FLOW_DIR:/var/log}/dead_letter_queue"

xpack.monitoring.enabled: ${MONITORING_ENABLED:false}
xpack.monitoring.elasticsearch.hosts: ["http://${ES_1}", "http://${ES_2}", "http://${ES_3}" ,"http://${ES_4}"]
xpack.monitoring.elasticsearch.sniffing: ${MONITORING_ENABLED:false}

pipelines.yml

$ cat pipelines.yml | grep -v '#'
- pipeline.id: elastiflow
  path.config: "/path/to/config/conf.d/*.conf"
- pipeline.id: dead_letter_queue
  path.config: "/path/to/another/config/bad_flow.conf.d/*.conf"
  pipeline.workers: 1

Hi

I am not familiar with sflow but I am curious.

How many events per second and how does the cpu usage look like?

Regards
Kim

Great question!

I'll use one node as an example:

The busy instances are receiving ~17k packets per second.

CPU usage is relatively low:

Yet I'm seeing many UDP receive buffer errors:

interesting ..

Have you read this topic : https://discuss.elastic.co/t/netflow-codec-udp-receive-errors/127111/7

That is a very interesting read. I agree that I'd likely benefit from breaking my large, monolithic Logstash instances into smaller containers with a load balancer, however I can probably only realistically run 4 instances per physical host before seeing performance degradation.

I suppose that may however be easier than refactoring the sflow codec. Although I'm concerned that with the current lack of throughput with the current codec, I'd need closer to 10-15 containers to replace every instance and not lose data...

We have one use case where we have one pipeline to receive syslog from our fw. This is high load 20.000+ eps. We just output to Kafka in that pipeline. You can then have any number of additional logstash nodes pulling data from. Kafka and parse events and ingest to elastic. This architecture has worked well for us.

Wish you good luck

Ah interesting - so your message queue is basically your load balancer in this case.. That might work for the load balancing solution that I clearly need. It looks like I need to apply basically three fixes to get the desired output:

  1. Horizontal scaling of Logstash instances
    i. Question remains: should I spin up multiple containers, or multiple LS instances within my container...
  2. Tuning LS instances
    i. For # workers, batch size, batch delay, & GB Heap
    ii. If anyone notices I'm missing a knob to tune, let me know
  3. Implement a load-balancer such as Nginx, HAProxy, or a message queue
    i. If anyone knows the tradeoffs better than I, please let me know. I'm currently leaning towards haproxy since it's already installed on these machines and is probably the most-lightweight.

Thanks for your input Kim!

I have two things for you to try...

  1. Go to the 4.x-dev branch of ElastiFlow and get the file sysctl.d/87-elastiflow.conf. Put this in /etc/sysctl.d and restart the server. This increases kernel buffers that will help with receiving a high volume of UDP packets. (NOTE: you could change these setting manually, but should still use the file so the changes are applied after a reboot.)

  2. Edit /etc/systemd/system/logstash.service and change the "nice" value from the default of 19 (the lowest thread priority) to 0 (the normal thread priority). Restart Logstash.

These changes should improve throughput quite a bit.

Rob

GitHub YouTube LinkedIn
How to install Elasticsearch & Kibana on Ubuntu - incl. hardware recommendations
What is the best storage technology for Elasticsearch?

1 Like

You're the man Rob. I'll try that today and let you know the results.

Thanks for the recommendations again. I received about a 10-12% gain in throughput by updating the nice level for Logstash. The sysctl optimizations provided about another 5% of gain.

I've horizontally scaled my Logstash instances in my container using Nginx as a load balancer as seen here:

I'm curious on your recommendations for tuning this setup.
I know that much performance can be subjective to my setup. That being said, given the hardware and the fact I'm trying to primarily process sFlow, I'm curious if you have any recommendations on the knobs I have to tune this.

  • How many instances of Logstash would you deploy per physical machine?
    • I can deploy more RAM to this container running these instances if that'll help
  • How many workers per instance of Logstash?
  • Batch size per instance?
  • Other parameters I'm overlooking?

One thing to note is that I take about a 5ms penalty with the Kafka output for every document.

I also used Nginx as a load balancer after finding that Haproxy doesn't have UDP support. I may swap that over to Traefik built into the image at some point., but their UDP support doesn't seem fully-matured either.

I've ruled out Samplicator as a bottleneck, as another service it fans out data to is verifying it's receiving all the packets, also just writing from Samplicator to a file, I can verify I don't lose data as we do when Logstash is connected.

If Kafka is in the mix, I would split the ElastiFlow pipeline into two parts. A collector/decoder and a post-processor with Kafka in the middle. On my to-do list is a video where I explain why this is a good idea for any high volume UDP use-cases.

You will also want to use the recommendations for kafka plugin configuration which I posted here... How to slow down large amount of data coming from filebeat?

Connect with me on LinkedIn (below) if you want to discuss in more detail.

Rob

GitHub YouTube LinkedIn
How to install Elasticsearch & Kibana on Ubuntu - incl. hardware recommendations
What is the best storage technology for Elasticsearch?

Thanks for the quick reply - unfortunately, I'm just looking at Kafka as an endpoint for downstream consumers at this point. I could potentially use kafka as an intermittent message queue and will review your provided link to determine if that's feasible for my situation.

In the meantime, while I'm testing various combinations of # instances, workers/instance, heap & batch size per instance, I found a large benefit of my load balancers:
you can chain them together. Therefore, if one physical machine with N local logstash instances can't handle the load from all the switches, I can simply add an additional endpoint to my Nginx config with the weight of N e.g:

.
.
stream {
        upstream logstash_sflow {
        server extra-load-balancer-1 weight=N;
        server extra-load-balancer-2 weight=N;
        server 1;
        .
        .
        .
        server N;
        }
.
.

and the load which originally was overwhelming one server is now overwhelming three servers in a balanced fashion. A diagram is also below - note how nginx can simply point towards another nginx instance. This supports balancing all three protocols, or just one problematic protocol, as we are doing with sflow in the code snippet above.

Follow-up:

Since I couldn't get around the bottleneck that is the sflow codec, I ended up building a custom codec in go utilizing the gopacket library. However, since our logstash config is rather-bloated for this pipeline, I'm still implementing multiple load-balanced instances per physical machine as described above. Currently a single instance of the custom codec can process about 60k docs/s, yet each LS instance appears to be limited to about 4k docs/s.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.