Filebeat sending data to Logstash seems too slow

Hello,
I'm trying out filebeat and Beat protocol with RPM Filebeat 1.0.0 and RPM Logstash 2.1.0 on two separated machine with 4 VCPU and 16 GB of RAM, Gigabit and SSD disk.

I've configured filebeat to send 250GB of plain text logs file to Logstash instance with input { beat { } } but I cannot send more that 50kbyte/s, is it normal?
Looking at Filebeat shipper instance CPU/RAM/Disk Read it look that very few resources are used, the same on the Logstash Indexer instance.

I see (through Kibana) about 12000/s records handled by elasticsearch, is then filebeat the bottleneck?

2 Likes

That doesn't seem normal at all. When you say you see 12k/s in Kibana, it's not in the same test, right? Can you try a direct filebeat -> ES connection to see what rate you get?

Hi,
as you suggested, I tried to send data directly to Elasticsearch and I could see 2 MByte/s of outgoing network traffic, sending to 3 Data Node and having 3 workers in filebeat. So I really wonder why when talking to Logstash the rate is so slow, can I do some other test in order try to understand this?

By the way, for the processed record speed, according to Kibana 5s refresh settings, I have to correct my previous calculus which was not divided by 5, so the correct event ingestion rate using the pipeline Filebeat->Logstash->Elasticsearch is about 3k/s, while for the Filebeat->Elasticsearch pipeline is about 7k/s

This is an isolated test, but I wonder also if I can use other better metrics to measure data ingestion rate.

Thanks

Adding to the Logstash used conf 3 workers for elasticsearch output plugin and also increasing flush_size I get about 100kbyte/s of outgoing network traffic now, which is better but always pretty far from 2MByte/s of the direct pipeline with Elasticsearch!

can you try to increase the bulk_max_size option in filebeat logstash output config?

output:
  logstash:
    bulk_max_size: ...

Just for reference, there's another report of this: Performance issues with input-beats plugin

Hi,
thanks for suggestions and the other thread link, which is very similar use case.
Anyway, making these changes, I don't see any improvements in network speed, but I'm figuring out that probably is just Logstash that "throttles" Filebeat, because I see in Logstash logs very lots of:

{:timestamp=>"2015-12-21T09:32:33.686000+0000", :message=>"retrying failed action with response code: 429", :level=>:warn}
thus it should be Elasticsearch in turn that throttles traffic. I tried some configuration in Logstash, but I cannot find a way to step over 10k/s which after sometimes are lowered by the amount of those errors. My Logstash configuration is:

elasticsearch { hosts => ["10.0.0.2:9200", "10.0.0.3:9200", "10.0.0.4:9200"] document_type => "log" index => "test-%{+YYYY.MM}" template_name => "test" workers => 4 flush_size => 1000 idle_flush_time => 100 retry_max_interval => 10 }
flush_size helped in speed, but it increased the number of error 429 from elasticsearch and it happened that traffic were stopped.

As I told you, my ES cluster is 3 Data Node (2VCPU+8GB RAM 100GB SSD) and 1 Master (4VCPU+16GB RAM+ 100 SSD), and from Marvel I don't see heap usage ever going over 70%, only CPU going sometimes higher.

I have 24 Indexes (they are YYYY.MM) and 200 shards, each node has 50 shards, and I left default settings for Elasticsearch, with replica active in order to simulate a real scenario of real time big data injestion.

Thus presuming that the speed is regulated by Logstash (it doesn't happens talking directly to ES, even if ingestion speed is not really much higher) how can I tune the whole Pipeline in order to have an acceptable speed for TB of data? I though that 4 nodes could be enough for a reasonable data ingestion speed, isn't it?

Thanks

1 Like

hello

I would like to import log file of 500Ko/s with filebeat to Logstash.

but it is too slow.

Logstash output

output {
  elasticsearch {
        hosts => ["XXXXXXXXXXXX"]
        index => "logstash-%{type}-%{+YYYY.MM.dd}"
        }
}

filebeat

filebeat:
  spool_size : 8192
  prospectors:
    -
      paths:
        - D:\LogApplicatives\XXXX.*
      document_type: XXXXX
  
output:
  logstash:
      enabled: true
      hosts: ["logstash:5043"]      
      bulk_max_size : 8192
      workers: 4

logging:
  to_syslog: true
  to_files: true
  level: info
  files:
      path: D:\filebeat
      name: filebeat.log
      rotateeverybytes: 10485760
      keepfiles: 7

which filebeat/logstash versions are you using? Haven't tested with logstash 2.3 yet, but logstash 2.2.1 did improve performance a little.

Any log output from logstash? Elasticsearch + logstash create some back-pressure also affecting filebeat. If elasticsearch can not index fast enough, logstash will be slowed down by elasticsearch. If logstash is slowed down or not fast enough to process data in time, it will block/slow down filebeat (as we don't want to drop any log lines).

Sometimes if logstash is receiving data from too many workers and filter/output pipeline takes too long, logstash might kill the filebeat->logstash connection. This can potentially slow down further processing (check logstash logs). To prevent this from happening (default 5s), set congestion_threshold in logstash beats input plugin to some very high value: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-beats.html#plugins-inputs-beats-congestion_threshold.

filebeat config with full load-balancing + 4 workers sending to logstash:

filebeat:
  spool_size : 8192
  publish_async: true
  prospectors:
    -
      paths:
        - D:\LogApplicatives\XXXX.*
      document_type: XXXXX
  
output:
  logstash:
      hosts: ["logstash:5043"]      
      bulk_max_size : 8192
      loadbalance: true
      workers: 4

Please note: full async load balancing increases CPU and memory usage in filebeat and logstash.

2 Likes

Hi Steffens

Thanks for your reply.

I use logstash-2.2.3-1.noarch.

I tried to put congestion_threshold, this is my input :

input {
beats {
port => "5043"
congestion_threshold=> "1000"
}
}

I've got a lot of these error on logstash log

{:timestamp=>"2016-04-04T17:30:33.261000+0200", :message=>"Beats input: unhandled exception", :exception=>#<Zlib::BufError: buffer error>, :backtrace=>["org/jruby/ext/zlib/ZStream.java:134:in finish'", "org/jruby/ext/zlib/JZlibInflate.java:72:ininflate'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/lumberjack/beats/server.rb:380:in compressed_payload'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/lumberjack/beats/server.rb:251:infeed'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/lumberjack/beats/server.rb:384:in compressed_payload'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/lumberjack/beats/server.rb:251:infeed'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/lumberjack/beats/server.rb:462:in read_socket'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/lumberjack/beats/server.rb:442:inrun'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/logstash/inputs/beats_support/connection_handler.rb:33:in accept'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/logstash/inputs/beats.rb:211:inhandle_new_connection'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/logstash/inputs/beats_support/circuit_breaker.rb:42:in execute'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/logstash/inputs/beats.rb:211:inhandle_new_connection'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/logstash/inputs/beats.rb:167:in `run'"], :level=>:error}
{:timestamp=>"2016-04-04T17:31:10.818000+0200", :message=>"SIGTERM received. Shutting down the pipeline.", :level=>:warn}

{:timestamp=>"2016-04-04T17:32:44.302000+0200", :message=>"Beats input: unhandled exception", :exception=>#<Zlib::BufError: buffer error>, :backtrace=>["org/jruby/ext/zlib/ZStream.java:134:in finish'", "org/jruby/ext/zlib/JZlibInflate.java:72:ininflate'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/lumberjack/beats/server.rb:380:in compressed_payload'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/lumberjack/beats/server.rb:251:infeed'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/lumberjack/beats/server.rb:462:in read_socket'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/lumberjack/beats/server.rb:442:inrun'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/logstash/inputs/beats_support/connection_handler.rb:33:in accept'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/logstash/inputs/beats.rb:211:inhandle_new_connection'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/logstash/inputs/beats_support/circuit_breaker.rb:42:in execute'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/logstash/inputs/beats.rb:211:inhandle_new_connection'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-beats-2.2.7/lib/logstash/inputs/beats.rb:167:in `run'"], :level=>:error}

looks like a bug in jruby zlib library. Seems to occur if compressed buffers are to big. Try to reduce the bulk_max_size to 4096 or 2048 events.

Hi

i reduce bulk_max_size to 1024, but my tru problem is that i can not keep on time my log on elasticsearch.

Filebeat can not send 10Ko/s to logstash.

why not set bulk_max_size=2048? I found 4k and 2k much better for throughput in comparison to. with this many workers on physical machine with 16 cores I measured like 45k lines/s.

some ways to measure performance and look for bottlenecks:

  1. have filebeat publish to console and redirect to /dev/null. Alternatively configure file output in filebeat
  2. have filebeat->logstash, but configure logstash stdout plugin only with 'dots' codec. Use pv to measure throughput.
  3. run filebeat->logstash->elasticsearch and measure throughput

If throughput in 2 is bigger than in 3, there is a problem with indexing performance in elasticsearch (increase workers in elasticsearch output?). If throughput in 1 is bigger than in 2 problem is either with network or logstash performance (any complicated filters taking long?).

Have you check resource usage on logstash machine?

You can measure throughput in filebeat by enabling profiling support in filebeat with -httpprof :6060 or -httpprof localhost:6060 (use later if you want to have HTTP port being available from localhost only) and use this python script to collect/measure throughput:

expvar_rates.py

import requests
import argparse
import time
import curses


def main():
    parser = argparse.ArgumentParser(
        description="Print per second stats from expvars")
    parser.add_argument("url",
                        help="The URL from where to read the values")
    args = parser.parse_args()

    stdscr = curses.initscr()
    curses.noecho()
    curses.cbreak()

    last_vals = {}

    # running average for last 30 measurements
    N = 30
    avg_vals = {}
    now = time.time()

    while True:
        try:
            time.sleep(1.0)
            stdscr.erase()

            r = requests.get(args.url)
            json = r.json()

            last = now
            now = time.time()
            dt = now - last

            for key, total in json.items():
                if isinstance(total, (int, long, float)):
                    if key in last_vals:
                        per_sec = (total - last_vals[key])/dt
                        if key not in avg_vals:
                            avg_vals[key] = []
                        avg_vals[key].append(per_sec)
                        if len(avg_vals[key]) > N:
                            avg_vals[key] = avg_vals[key][1:]
                        avg_sec = sum(avg_vals[key])/len(avg_vals[key])
                    else:
                        per_sec = "na"
                        avg_sec = "na"
                    last_vals[key] = total
                    stdscr.addstr("{}: {}/s (avg: {}/s) (total: {})\n"
                                  .format(key, per_sec, avg_sec, total))
            stdscr.refresh()
        except requests.ConnectionError:
            stdscr.addstr("Waiting for connection...\n")
            stdscr.refresh()
            last_vals = {}
            avg_vals = {}

if __name__ == "__main__":
    main()

run script via python expvar_rates.py http://localhost:6060 to get some output.

2 Likes

Steffens thanks for your help.

I put my logstash worker from 1 to 12 and filebeat send quickly the log now, but i had to delete multiline filter on logstash :frowning:

Multiline processing should ideally be done as close to the source as possible, which is why Filebeat now supports this. Moving away from the multiline filter in Logstash is good as it is being deprecated.

multiline should be done in filebeat. 12 workers sounds crazy big. Without multiline in logstash can you reduce workers again? Any other filters in logstash config you can try to optimize (see logstash blog post)

Steffens my logstash server keep data from 60 filebeat client.

i put this config on logstash
LS_HEAP_SIZE="2G"
LS_OPTS="-w 100"

and it works good

I used multiline on my filebeat application and it works like a charm.

Hi

I reach to up my filebeat to send an average of 6Ko/sec

Hi Steffens,

I am facing 4hours delay in logs , please see my below filebeat.yaml.

logstash:
# The Logstash hosts
hosts: ["172.16.100.46:5044"]
bulk_max_size: 10048

# Number of workers per Logstash host.
worker: 6

# The maximum number of events to bulk into a single batch window. The
# default is 2048.
#bulk_max_size: 2048

# Set gzip compression level.
compression_level: 9

# Optional load balance the events between the Logstash hosts
#loadbalance: true

# Optional index name. The default index name depends on the each beat.
# For Packetbeat, the default is set to packetbeat, for Topbeat
# top topbeat and for Filebeat to filebeat.
index: filebeat

# Optional TLS. By default is off.

tls:
# List of root certificates for HTTPS server verifications
certificate_authorities: ["/etc/pki/tls/certs/logstash-forwarder.crt"]

  # Certificate for TLS client authentication
  #certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #certificate_key: "/etc/pki/client/cert.key"

  # Controls whether the client verifies server certificates and host name.
  # If insecure is set to true, all server host names and certificates will be
  # accepted. In this mode TLS based connections are susceptible to
  # man-in-the-middle attacks. Use only for testing.
  #insecure: true

  # Configure cipher suites to be used for TLS connections
  #cipher_suites: []

  # Configure curve types for ECDHE based cipher suites
  #curve_types: []

This thread is one year old, can you open a new topic, please?