Can Filebeat Handle Load to Push Around 200Mil events per hour to kafka

Hello ,

Has any one optimized Filebeat to send more than 200Mil events per hour to kafka to various topics .

Because what i am seeing in my setup is it is way too slow to handle or carter that request - It is only able to send about 1/4 of the event to kafka .

max_procs: 20
queue.mem:
  events: 600000
  flush.min_events: 600000
  flush.timeout: 50ms
http.enabled: true
output.kafka:
  required_acks: 1
  worker: 40
#  compression: gzip
  bulk_max_size: 1000000
  max_message_bytes: 1000000

Publised events max goes to 3-4 lakh from logs wat i have seen

    • 2019-03-13T07:42:03.071+0530 INFO [monitoring] log/log.go:144 Non-zero metrics in the last 30s {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":277770,"time":{"ms":14390}},"total":{"ticks":2908220,"time":{"ms":153340},"value":2908220},"user":{"ticks":2630450,"time":{"ms":138950}}},"handles":{"limit":{"hard":4096,"soft":1024},"open":87},"info":{"ephemeral_id":"05c930bd-4d6a-4200-adfe-7352055e7682","uptime":{"ms":660049}},"memstats":{"gc_next":667089056,"memory_alloc":571288568,"memory_total":1708086450584,"rss":122880}},"filebeat":{"events":{"active":-3541,"added":196761,"done":200302},"harvester":{"open_files":22,"running":22}},"libbeat":{"config":{"module":{"running":0}},"output":{"events":{"acked":199957,"active":-1530,"batches":287,"total":198427}},"outputs":{"kafka":{"bytes_read":5063141,"bytes_write":110404317}},"pipeline":{"clients":21,"events":{"active":2891,"filtered":346,"published":196412,"total":196761},"queue":{"acked":199957}}},"registrar":{"states":{"current":24,"update":200302},"writes":{"success":279,"total":279}},"system":{"load":{"1":22.27,"15":19.05,"5":21.11,"norm":{"1":0.5568,"15":0.4763,"5":0.5277}}}}}}

    • 2019-03-13T07:42:33.038+0530 INFO [monitoring] log/log.go:144 Non-zero metrics in the last 30s {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":292460,"time":{"ms":14688}},"total":{"ticks":3065440,"time":{"ms":157195},"value":3065440},"user":{"ticks":2772980,"time":{"ms":142507}}},"handles":{"limit":{"hard":4096,"soft":1024},"open":87},"info":{"ephemeral_id":"05c930bd-4d6a-4200-adfe-7352055e7682","uptime":{"ms":690048}},"memstats":{"gc_next":246715056,"memory_alloc":137692184,"memory_total":1806615640928,"rss":19525632}},"filebeat":{"events":{"active":-870,"added":198928,"done":199798},"harvester":{"open_files":22,"running":22}},"libbeat":{"config":{"module":{"running":0}},"output":{"events":{"acked":199468,"active":557,"batches":293,"total":200025}},"outputs":{"kafka":{"bytes_read":5167621,"bytes_write":111660892}},"pipeline":{"clients":21,"events":{"active":2014,"filtered":337,"published":198594,"total":198928},"queue":{"acked":199468}}},"registrar":{"states":{"current":24,"update":199798},"writes":{"success":291,"total":291}},"system":{"load":{"1":23.67,"15":19.33,"5":21.72,"norm":{"1":0.5918,"15":0.4833,"5":0.543}}}}}}

File rotation is Per hour so test.log gets rotated every hour

  • type: log

    enabled: true

    close_inactive: 3h
    scan_frequency: 2s
    paths:
      - /var/log/test.log
    fields:
      log_topic: mytopic
    

The event rate you want to achieve is about 56k events per second. You current rate is ~6.7k eps.

First of all I'd test with console output and some test corpus (so tests can be re-run) if filebeat can read this many events. E.g. send event to pv: filebeat -c test.yml | pv -Warl >/dev/null. This will count throughput per second live.

If filebeat can easily process this many events, then the cause is most likeley backpressure from network or kafka.

Your settings are actually very inefficient. You have 40 workers configured. This means 40 kafka clients. Each trying to draw a batch of 1000000 events from the queue only holding 600000 events. Only after the events in a batch have been ACKed will the queue be freed again.
First of all: the number of total worker is worker * #partitions per topic * #topics. Assuming one topic with 10 partitions, you actually have 400 workers trying to publish events. But how many will actually be served is somewhat questionable. Also due to the very small timeout. There is a chance you are playing ping-pong with very small and one big batch from time to time. The smaller batches introduce additional latency when publishing, the large batch introduce long wait times without filebeat having a chance to progress. Looking at your metrics the actual batch size seems to be about ~700 events. This batch is further split among the actual partitions.

A high max_procs setting can be bad as well. Do not set max_procs to a high value if only a few go-routines are potentially active. Otherwise the go-runtime will take loads of CPU looking for active go-routines, wasting loads of CPU. This can even affect throughput. (tip: use htop to get an idea how many OS threads are actually active)

Let's start small :

max_procs: 4

queue.mem:
  events: 65536
  flush.min_events: 4096
  flush.timeout: 1s

output.kafka:
  ...
  bulk_max_size: 4096

With these settings the queue (assuming there was no timeout) can provide up to 16 batches. The publisher is asynchronous. This might help with having a constant stream of events available, the time we get an ACK from kafka.

All in all a many factors can influence throughtput:

  • disk: local, hard drive, SSD, network share, file cache, how many files are read concurrently, registry flushes
  • network: load balancer, kafka behind common firewall
  • kafka: number of topics and partitions

But really: start with collecting a base number by sending via console output. This is the mamimum rate your machine/disk allows you to consume with filebeat. Then we can try to approach this base number with the kafka output.

Thanks for you detailed input .

I have around 16 Topics with varying partitions - I kknow about workers but i assume i have difficulty in pushing the large batches to kafka -

You are right i have to start with base numbers but some how i had to push large number of events to kafka ( i have 66 brokers ) and i have 15 topics with 5 parition and 2 replica and 1 topic with 40 partition and 2 replica

I started a new instance of filebeat on same server - Now i have 2 instance of filebeat running where 1 instance of filebeat has all other topics (15 topics ) and other instance has only one log file which has huge load .

The configuration for the second instance is around -

> - type: log
> 
>   enabled: true
> 
>   close_inactive: 3h
>   scan_frequency: 100ms
>   paths:
>     - /var/log/file.log
> 
>   fields:
>     log_topic: 109154002
> 
> max_procs: 30
> queue.mem:
>   events: 2000000
>   flush.min_events: 2000000
>   flush.timeout: 50ms
> http.enabled: false
> output.kafka:
>   # initial brokers for reading cluster metadata
>   hosts: ["kafkabrokers"]
> 
>   # message topic selection + partitioning
>   topic: '%{[fields.log_topic]}'
>   partition.round_robin:
>     reachable_only: true
> 
>   required_acks: 1
>   worker: 40
> #  compression: gzip
>   bulk_max_size: 2000000
>   max_message_bytes: 1000000

**But after new instance i amble to see improved performance **

**> The logs for new Instance which send logs for one instance is as follows **
>

2019-03-14T12:43:13.778+0530 INFO [monitoring] log/log.go:144 Non-zero metrics in the last 30s {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":65828510,"time":{"ms":10652}},"total":{"ticks":656865230,"time":{"ms":99155},"value":656865230},"user":{"ticks":591036720,"time":{"ms":88503}}},"handles":{"limit":{"hard":4096,"soft":1024},"open":46},"info":{"ephemeral_id":"48423bad-0ff9-4929-a316-3580953aa28f","uptime":{"ms":89100064}},"memstats":{"gc_next":1305428208,"memory_alloc":663167712,"memory_total":345657458030664,"rss":71311360}},"filebeat":{"events":{"active":1126,"added":62352,"done":61226},"harvester":{"open_files":4,"running":4}},"libbeat":{"config":{"module":{"running":0}},"output":{"events":{"acked":61226,"batches":49,"total":61226}},"outputs":{"kafka":{"bytes_read":2872816,"bytes_write":54173422}},"pipeline":{"clients":1,"events":{"active":995,"published":62221,"total":62221},"queue":{"acked":61226}}},"registrar":{"states":{"current":4,"update":61226},"writes":{"success":49,"total":49}},"system":{"load":{"1":22.27,"15":18.72,"5":19.47,"norm":{"1":0.5568,"15":0.468,"5":0.4868}}}}}}
2019-03-14T12:43:43.782+0530 INFO [monitoring] log/log.go:144 Non-zero metrics in the last 30s {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":65839950,"time":{"ms":11439}},"total":{"ticks":656966610,"time":{"ms":101373},"value":656966610},"user":{"ticks":591126660,"time":{"ms":89934}}},"handles":{"limit":{"hard":4096,"soft":1024},"open":46},"info":{"ephemeral_id":"48423bad-0ff9-4929-a316-3580953aa28f","uptime":{"ms":89130064}},"memstats":{"gc_next":985132336,"memory_alloc":515555904,"memory_total":345713813718576,"rss":-235307008}},"filebeat":{"events":{"active":-588,"added":64101,"done":64689},"harvester":{"open_files":4,"running":4}},"libbeat":{"config":{"module":{"running":0}},"output":{"events":{"acked":64689,"batches":50,"total":64689}},"outputs":{"kafka":{"bytes_read":2990569,"bytes_write":56883450}},"pipeline":{"clients":1,"events":{"active":54,"published":63748,"total":63748},"queue":{"acked":64689}}},"registrar":{"states":{"current":4,"update":64689},"writes":{"success":50,"total":50}},"system":{"load":{"1":23.84,"15":18.93,"5":20.06,"norm":{"1":0.596,"15":0.4733,"5":0.5015}}}}}}

Logs for the other instance which sends logs for all other 15 topics is as follows

2019-03-14T12:44:06.790+0530 INFO [monitoring] log/log.go:144 Non-zero metrics in the last 30s {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":7816360,"time":{"ms":2864}},"total":{"ticks":88429430,"time":{"ms":38575},"value":88429430},"user":{"ticks":80613070,"time":{"ms":35711}}},"handles":{"limit":{"hard":4096,"soft":1024},"open":130},"info":{"ephemeral_id":"6ec4cdc3-ae72-470a-a5d9-b60f019aae4d","uptime":{"ms":88800044}},"memstats":{"gc_next":280959328,"memory_alloc":216458960,"memory_total":54420119051912}},"filebeat":{"events":{"active":220,"added":68641,"done":68421},"harvester":{"open_files":64,"running":64}},"libbeat":{"config":{"module":{"running":0}},"output":{"events":{"acked":67820,"active":424,"batches":261,"total":68244}},"outputs":{"kafka":{"bytes_read":606389,"bytes_write":20841371}},"pipeline":{"clients":20,"events":{"active":445,"filtered":177,"published":68264,"total":68442},"queue":{"acked":67820}}},"registrar":{"states":{"current":67,"update":68421},"writes":{"success":263,"total":263}},"system":{"load":{"1":27.56,"15":19.32,"5":21.16,"norm":{"1":0.689,"15":0.483,"5":0.529}}}}}}
2019-03-14T12:44:06.833+0530 INFO kafka/log.go:53 client/metadata fetching metadata for all topics from

2019-03-14T12:44:36.771+0530 INFO [monitoring] log/log.go:144 Non-zero metrics in the last 30s {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":7819090,"time":{"ms":2739}},"total":{"ticks":88467390,"time":{"ms":37975},"value":88467390},"user":{"ticks":80648300,"time":{"ms":35236}}},"handles":{"limit":{"hard":4096,"soft":1024},"open":130},"info":{"ephemeral_id":"6ec4cdc3-ae72-470a-a5d9-b60f019aae4d","uptime":{"ms":88830040}},"memstats":{"gc_next":247090800,"memory_alloc":200991592,"memory_total":54442067279696,"rss":-3309568}},"filebeat":{"events":{"active":-186,"added":68267,"done":68453},"harvester":{"open_files":64,"running":64}},"libbeat":{"config":{"module":{"running":0}},"output":{"events":{"acked":68692,"active":-424,"batches":256,"total":68268}},"outputs":{"kafka":{"bytes_read":589044,"bytes_write":20467182}},"pipeline":{"clients":20,"events":{"active":34,"filtered":185,"published":68282,"total":68466},"queue":{"acked":68692}}},"registrar":{"states":{"current":67,"update":68453},"writes":{"success":258,"total":258}},"system":{"load":{"1":20.05,"15":18.95,"5":19.9,"norm":{"1":0.5013,"15":0.4738,"5":0.4975}}}}}}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.