Recommended number of workers

I'm using ElasticSearch ingest node as output.

I didn't set worker in filebeat.yml at first. The indexing rate was around 3k doc/sec.

Then I set worker to 4. The indexing rate doubled to 6k doc/sec.

What is the default number of workers in Filebeat with ElasticSearch as output?

What is the recommended number of workers? Is it related to the number of cores your CPU has?

Thanks!

1 Like

Short answer: it depends.

in filebeat the spooler batch (spooler_size setting) is split up into multiple sub-batches of size output.elasticsearch.bulk_max_size (default 50 for elasticsearch). By default (don't enable publish_async in filebeat as it's known to be unstable), only after all sub-batches have been processed, can the spooler push another batch to be published. That is filebeat by default uses some log-step-load-balancing.

By default spooler_size = 2048 and bulk_max_size = 50, you get up to N = spooler_size / bulk_max_size = 41 sub-batches. Having workers > N won't help parallelizing anymore. Parallelizing too much, might result in too many requests filling the queues (alternatively increase bulk_max_size).
Elasitcsearch internally also employ some work queues. If these queues are full, errors for un-processed events are returned and filebeat has to send the same events again (killing throughput).
Depending on the Elasitcsearch setup and number of beat instances publishing , you might try to increase bulk_max_size as well.

That is you have spooler_size, bulk_max_size and worker settings to change/optimize throughput. Which combination of bulk_max_size and worker works best in your environment you have to test for yourself.

The defaults are very conservative, so to deal with very small Elasticsearch setups as well:
worker: 1, bulk_max_size: 50 and spooler_size: 2048.

3 Likes

Excellent explanation. Thanks a lot!

My bulk_max_size was 512. I raised worker from 4 to 8; the indexing rate didn't change. Now I see why.

We need to index 100GB compressed logs every day, and the expected indexing rate is ~12k doc/sec. I think I'll set spool_size=4096 to see if it helps lift the indexing rate.

having filebeat write to file or console (/dev/null) you can get some baseline on how many events filebeat can process., e.g.

./filebeat -e -v -c <config file> -E output.elasticsearch.enabled=false -E output.console.pretty=false | pv -Warl >/dev/null

Having the baseline, it's up to the output, elasticsearch and network, how much data can be pushed.

Instead of increasing workers, you can also try to set bulk_max_size = spool_size with worker=1. By increasing the bulk_max_size = spool_size, you might hit a limit in amount of events you can push at once. Filebeat prints internal metrics to logs every 30 seconds (or use -httpprof :6060). Check for metrics having elasticsearch in the name. If you have any not_acked metrics in there, Elasticsearch did not accept this amount of events, likely due to queue being full. Having this baseline number B, you maybe have an idea how many events you want to have inflight between beats and elasticsearch (B + X >= worker * bulk_max_size). The X is some sandbagging, so you have some margin in case of something not performing at it's best from time to time.
As load balancing is still sending the sub-batches in parallel, you next want to increase N = spooler_size / bulk_max_size. The bigger N, the less latency for pushing another batch to the workers. With workers in parallel, you don't want the spooler (or other workers) wait on one worker only. Assuming all workers turn-around time is about similar, having N being a multiple of worker might be somewhat beneficial (Unfortunately the log-step-load-balancing is somewhat killing throughput).

We're currently working on some publisher pipeline refactoring in filebeat, which will replace the spooler with some asynchronously working event buffer. This will remove lock-step-load-balancing in favour of fully load-balanced workers publishing events more independently. Still, event ACKs have to processed in publish order, that is one blocked worker still has the chance to block the publisher pipeline (we will likely address this issue in the future with cancelling request and rebalancing events to other workers).

When sending, quite some back-pressure on performance might come from elasticsearch directly. Having multiple dedicated Ingest Nodes and configuring filebeat to push to these nodes might help as well (never tried myself and I'm not fully sure about the ES architecture though).

2 Likes

Sorry for late reply. There's a lot of useful information in your last post. I've been trying to figure it out for the past few days and have a few more questions.

(Q1)

I set bulk_max_size=4096 and spool_size=8192, but when I ran

./filebeat -e -v -c filebeat.yml -E output.elasticsearch.enabled=false -E output.console.pretty=false | pv -Warl >/dev/null

I got:
2017/07/11 13:29:48.035422 async.go:64: INFO Max Bulk Size set to: 2048 .... 2017/07/11 13:29:48.036233 spooler.go:63: INFO Starting spooler: spool_size: 2048; idle_timeout: 5s .... 2017/07/11 13:29:48.036414 prospector.go:124: INFO Starting prospector of type: log; id: 14868134581313186727

bulk_max_size and spool_size remain 2048 no matter how I change bulk_max_size and spool_size in my config file... Also, the same id=14868134581313186727 shows up every time.

Probably Filebeat is actually not reading my config file? Is there any indentation format I need to follow in the config file?

So I'm still very confused of how to get the baseline on how many events Filebeat can process. Could you elaborate more on this please?

(Q2)

Another question is about

Instead of increasing workers, you can also try to set bulk_max_size = spool_size with worker=1. By increasing the bulk_max_size = spool_size, you might hit a limit in amount of events you can push at once. Filebeat prints internal metrics to logs every 30 seconds (or use -httpprof :6060). Check for metrics having elasticsearch in the name. If you have any not_acked metrics in there, Elasticsearch did not accept this amount of events, likely due to queue being full.

Do you mean I can set worker=1 and increase bulk_max_size = spool_size, until I see not_acked in my Filebeat logs? Do I get the baseline number B from here, or from running the command before in Question 1?

Thank you!

  1. Can you share your config file? Which version of filebeat are you running?
    The spool_size setting must be filebeat.spool_size. The bulk_max_size setting is part of the output (e.g. output.elasticsearch.bulk_max_size). I'm not sure right now if the console output is using the setting, I think it's using bulk_max_size 0 by default, bypassing additionally buffering in the publisher pipeline (in upcoming beta release, spool_size will be removed)

  2. That's the idea, yes. The baseline is for sending to this particular elasticsearch instance. Sending to another output or elasticsearch cluster might give you another baseline.

e.g. you can try

buffer_size: 2048

filebeat.prospectors:
...

filebeat.spool_size: ${buffer_size}

output.elasticsearch:
  ...
  bulk_max_size: ${buffer_size}
  worker: 1

Then you can overwrite the buffer_size setting from CLI via -E buffer_size=4096.

For debugging purposes you can open the http profiling endpoint (do not open this port publicly in production) by adding -httpprof :6060 to filebeat on command line. Then you can can get the metrics via curl or browser from http://localhost:6060/debug/vars.

filebeat will print only updated metrics in it's logs. If not_acked is never changed (always 0), it will not appear in logs. But you can still see the number of acked events.

1 Like

I'm using Filebeat 5.4.1. My config file is as follows:

########################## Filebeat Configuration #############################

#=========================== Filebeat prospectors =============================

filebeat.prospectors:

# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.

- input_type: log
  document_type: my_type
  paths: /Users/hding/workspace/es_project2017/raw_logs/*/*/*.log


#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
spool_size: 8192

#================================ Processors ===================================

# Processors are used to reduce the number of fields in the exported event or to
# enhance the event with external metadata. This section defines a list of
# processors that are applied one by one and the first one receives the initial
# event:
#
#   event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields, and
# add_cloud_metadata.

processors:

- drop_fields:
    fields: [ "beat", "input_type" ]


#================================ Outputs =====================================

# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.

#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  protocol: "http"
  username: "elastic"
  password: "changeme"

  # Configure index templates (settings and mappings)
  template.enabled: false

  # Configure indices
  index: "my_index"

  # Configure pipelines
  pipeline: "my_pipeline"

  # Number of workers per Elasticsearch host.
  worker: 2

  # Bulk settings
  bulk_max_size: 4096

For the above configuration, it took 2 hours 41 mins to index 56 million lines. The indexing rate is 5881 doc/sec.

Each of our machine receives roughly 1 billion lines per day. So my goal is to achieve 11683 doc/sec.

Is my current configuration too conservative?

fixed your config. It must say filebeat.spool_size. With your config you've been sending a batch of 2048 events to the output. As bulk_max_size > 2048, only one worker has been active.

filebeat.prospectors:
- input_type: log
  document_type: my_type
  paths: /Users/hding/workspace/es_project2017/raw_logs/*/*/*.log

filebeat.spool_size: 8192     # <- fixed

processors:
- drop_fields:
    fields: [ "beat", "input_type" ]

output.elasticsearch:
  hosts: ["http://localhost:9200"]
  username: "elastic"
  password: "changeme"
  template.enabled: false
  index: "my_index-%{+yyyy.MM.dd}"
  pipeline: "my_pipeline"
  worker: 2
  bulk_max_size: 4096
1 Like

Thanks a lot, Steffen! I'll try the new config file.

This topic was automatically closed after 21 days. New replies are no longer allowed.