Very low index rate when using logtstash and / or kafka/redis

Hi all,
Building a new cluster under 7.5.1. We are using the Filebeat Cisco Module to collect cisco asa logs. Firstly, we tried to store them in a Kafka Node and get them via Logstash into Elasticsearch. We saw very fast, that the index rate was not high enough to get all the logs. We have to index several thousands of messages per second. The Kafka Disk ran full, because it became more input than logstash could remove again.
We tried many different combinations with logstash (1 Logstash host, 2 Logstash hosts), also on different locations (stand alone, together with the hot elastic nodes) etc. We also tried different CPUs, RAM, changed worker, batch size and so on. We tested really much for the last whole week.
All in all, we never got an index rate higher than around ~7000k/s.

Then we tested to push the logs directly into Elasticsearch by the as same as filebeat, which is also getting the logs. After tweaking the filebeat a little bit (more worker, queue mem etc.) we suddenly got an index rate up to 34.000/s. The average rate was around 12000/s for a single beat and we didn't have any lag in getting the logs anymore. Before that, we could see, hot the gap in time became bigger and bigger, because the events where indexed to slow.
In conclusion we tried this combinations (where || is a Firewall)
Elasticsearch<-Logstash(input kafka output elastic)->||Kafka<-Filebeat (output kafka)
Elasticsearch<-Filebeat(input kafka output elastic)->||Kafka<-Filebeat (output kafka)
Elasticsearch<-Logstash(input redis output elastic)->||Redis<-Filebeat (output redis)
Elasticsearch<-Filebeat(input redis output elastic)->||Redis<-Filebeat (output redis)
Elasticsearch||<-Filebeat (output elastic)

The big question now is, where does the huge difference come from?

Regards,
Marcus

Hi @MarcusCaepio,

Do you mean that the only configuration that didn't have any lag was Elasticsearch||<-Filebeat (output elastic)? Did you identify what was the bottle-neck in the other scenarios?

It is actually recommend to send events directly from Filebeat to Elasticsearch when possible, would it be ok in your scenario? If you cannot expose your ES cluster in the same network as the nodes where filebeat is running you could try to place some ingest nodes in this network (something like Elasticsearch||Elasticsearch (ingest node) <-Filebeat (output elastic)).

Hi @jsoriano thanks for your reply!

Exactly. Unfortunately, I did not find the bottleneck. It doesn't matter, if I use Kafka oder Redis as Buffer and it also doesn't matter if I use Filebeat or Logstash as Data Puller. The top was always around 7000/s.

Unfortunately, this is not possible in our environment, as the Filebeat for cisco is in a DMZ and the Elastic Cluster is in the internal network. It is ok to go from the internal network into the DMZ, but not the other way. So, also an ingest node in the DMZ would not work, as it cannot connect to the internal cluster. That's why I tried the different combinations with a buffer, where I can "pull" the data out of the DMZ instead of pushing it from the DMZ to the internal network. We just tried the pushing as a comparison, but it seems, that only the pushing method gives us the necessary index rate with above 10000/s. But I still don't understand why and where the bottleneck is. E.g. setting the output of logstash to /dev/null gave us a huge event rate in the logstash monitoring of kibana, so it is not kafka and it is not logstash in getting the data. There are also no filter in logstash, just input and output. As I am using the ingest pipelines on my hot nodes, I have only one idea left: Are there different methods of ingesting data when you are "pulling" data from somewhere in comparison to let the beat push itself?

Oh ok, I understand, then you will need some queue, yes.

It is interesting that in all scenarios the highest rate is similar. I wonder if there can be some throttling in the firewall that doesn't affect the scenario where filebeat sends events directly to ES.

For the matter of testing, could you compare the performance of these two scenarios?

  • Elasticsearch||<-Filebeat (output elastic)
  • Elasticsearch||<-Filebeat (input kafka output elastic)->Kafka<-Filebeat (output kafka)

Oh, and regarding this question, in principle the output is the same. It could happen that the bottle neck is in the input if it is slower, but then starting multiple consumer instances should increase the global throughput.

We tested Elasticsearch||<-Filebeat (input kafka output elastic)->Kafka<-Filebeat (output kafka) again today and could see, that the Kafka Lag increased permanently, which means, that Filebeat Cisco pushes faster than Filebeat on Hot Nodes pulls out again.
Back to Elasticsearch||<-Filebeat (output elastic), it is running faster and there are no Fail Rates in the Filebeat Monitoring

Hi,

it is a little difficult to remote debug performance in multi-step pipelines. The bottleneck can be in different places or can be a sum of multiple smaller bottlenecks.

Let's take a step back and select an architecture first. Once we have settled on the architecture let's separate the pipeline, define some tests, tune in separation, and finally put everthing back together.

Redis. vs Kafka:
When using redis, you can collect events using Logstash only. When using Kafka you can collect events using Filebeat or Logstash.
Sizing requirements for redis (memory) and Kafka (disk) will be somewhat different. When using redis, the events are stored in a List in main memory by default. Filebeat can also load-balance to multiple redis instances.
When using Kafka, the events are stored on disk. But it depends on your retention policy when events will be evicted. Kafka will keep events on disk, event if all events have been consumed. Cleaning up disk space is subject to the configured retention policy. If your disks unexpectatly run full, this is not due to slow consumers, but due to the sizing and/or retention policy not matching storage requirements. If consumers are always behind and never have a chance to catch up, then events will get lost due to the retention policy.

When using kafka, the number of partitions (to be set at topic creation time) determine the amount of horizontal scaling you can have. When increasing the number of topics one also wants to have multiple kafka brokers and enabled replication (disabled by default). Without replication the whole system can become blocked if a broker goes down.

The discussions so far I'm assuming that we are settling on Kafka. What is nice about Kafka is that it somewhat decouples the original FB from the consumer FB/Logstash. When testing/tuning for throughput one can use a pre-filled topic and just change the name of the consumer group (by default Kafka removes a consumer groups state after 7 days) or reset the 'offset' to oldest in Filebeat/Logstash.

Next I would run some tests to get an idea about performance of the single components:

  1. fileabeat -> stdout: Get some base line performance of what the input system is capable
  2. filebeat -> kafka: Does network + kafka still allow high enough rates
  3. kafka -> filebeat -> stdout: How fast can we consume end re-encode events to JSON
  4. kafka -> logstash -> stdout with JSON codec : Same, but let's see if LS allows for higher rates (uses a different kafka client)
  5. (optional) FB/Logstash -> ES: see if the machines we run the collecting FB/LS instance do impact performance as well
  6. kafka -> FB/LS -> ES

It seems like you can already push fast enough to Kafka, so no need to run the first 2 tests. But it's still nice to have them in case we want to modify the Kafka server configuration (e.g. increase partitions, add brokers, configure replication, require minimum number of ACKs within cluster, ...).

For tests 1 and 2 you it is best to have an already written sample log file. Do not test with a live log file, as the writes on the active file can bias the outcome. Also remove the data directory (especially) registry between runs, so filebeat starts collecting from the beginning.

For test 1 run ./filebeat -c test-config.yml | pv -Warl > /dev/null. Filebeat will create JSON events, one per line and the pv tool will show you current and average rate of lines being written by Filebeat.

For test 2 configure the kafka output and collect metrics from the HTTP API or via monitoring in Filebeat.

For the upcoming tests use Filebeat (from test 2) and prefill a kafka topic. We don't need to run the other tests with logs being actively published. This also enables us to try to tune for even higher event rates (logs written by your CISCO setup) then we are currently seeing. When changing the number of partitions on your test topic, you need to prefill it again.

Test 3 (kafka -> FB -> stdout): Filebeat uses end-to-end ACK. Only if the output has ACKed an event will Filebeat update the read offset in Kafka (special API call). This tests checks kafka -> FB kafka input -> json parsing -> batching -> json encoding + induced latency due to end-to-end ACK. Use the same command as for the first test, but with the kafka input configured. Tuning potentials: number of partitions (e.g. up to one per physical disk * brokers) + number of consumers within a consumer group (e.g. multiple beats or multiple kafka inputs), fetch.min, fetch.max, and fetch.default settings in kafka input.

Test 4 is what you already did. I'm just not sure which codec you used when pushing to /dev/null (for complex events JSON encoding can add quite some CPU overhead).

Choose Filebeat or Logstash based on the outcome of tests 3 or 4. If both are fast enough we will select FB or Logstash depending on further testing. When comparing performance keep in mind that FB has full end-to-end ACK, which LS does not have.

Test 5 is somewhat interesting. You mentioned that directly pushing from the edge machine to ES gets you high throughput, but pushing from the 'collector' (reading from Kafka) Filebeat/Logstash kills throughput. This makes me wonder if the machine type, network setup, or just the FB/Logstash output configuration on the 'collector' machine play a role here.
e.g. prepare a log file to collect from and directly push to ES from these machines. Check if you can tune the output. For filebeat we can tune queue.mem.events, output.elasticsearch.bulk_max_size, and output.elasticsearch.workers. In order to keep the outputs 'saturated' select queue.mem.events >= output.elasticsearch.bulk_max_size * output.elasticsearch.workers. Quotas or rate limits in your network setup can also impact throughput. Check of output.elasticsearch.compression_level has an impact as well (compression is disabled by default).

Test 6: setup check complete 'collector' pipeline using the sample Kafka topic you have prepared.

If it seems like your throughput is good enough after Test 6,

In case you change the number of Kafka partitions, add more brokers to Kafka, or add replication (or change ACK policy), also re-run Test 2, as these changes can impact the initial Filebeat->Kafka publishing as well.

When testing, please share configurations and metrics with us. We can build a better picture of your setup, the more information we have.

Hi @steffens
Thank you very much for your detailed answer. I will give feedback as soon as I can, but It will take a few days.
But maybe an interesting information already: We already tried to increase the partitions on kafka, up to 32. We followed the elastic docs according to the consumer_threads = partitions.
We already found out, that there is maybe a bug in the filebeat input kafka:
First, we connected with 1 Filebeat, which is installed on hot-1. In the kafka consumer group overview, you can see, that it is connected. As soon as we connect with a second filebeat, installed on hot-2, the consumer group changes to "rebalancing" and stays there unlimited. This does not happens with 2 logstashes, here the rebalancing just takes a second. So filebeat is currently out, I will create an issue in github for this.
For all the other tests, I will do and document it as soon as I have time for this.

Regards,
Marcus

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.