Logstash Indexer to Elastcsearch Tunnings ( must go faster!)

Hi,

I am trying to come up with tunning settings for my Logstash Indexer

It sits between Kafka and Elasticsearch (Pulling off of 1 topic called logstash with 25 partitions)

I have 12 nodes (2 client, 3 masters and the rest data nodes) - going to add 3 more data nodes over the next week

I am indexing 500M documents a day and probably will double that by the end of the year.

I had marvel installed and it said that my Indexing Latency was .7 MS (Really fast)

However, I can not seem to get rid of the Backlog playing around with the different Kafka input and elasticsearch output settings. (and number of Logstash instances 3-6 of them)

So,

I am looking for some calculation on what I should set each of the settings for or a framework on what may give me the best bang for the buck

here is a Metric graph of logstash's indexing rates per server over a couple of days
The big drops in spikes are me restarting with tunning changes

Please help me Obe-won, your my only hope :smiley:

input { 
   kafka {
        topic_id => "logstash"
        zk_connect => "{{ Kafka_Zookeeper}}:{{ Zookeeper_Port}}"
		rebalance_max_retries => 20
		consumer_threads => 1
		queue_size => 2000
		decorate_events => true
		consumer_id => "${HOSTNAME}"
   }
}

#### I have no filters

#### Output is deploy via ansible template and the values get replaced for things like hosts
output {
  #stdout { codec => rubydebug }
  # template_filename is defined in the $ANSIBLE_HOME/roles/ls-indexers/vars/main.yml
  if [type] =~ "heartbeat" {
            elasticsearch {
		hosts => [ {% for host in groups['es-data'] %}"{{ host }}:9210"{% if not loop.last %},{% endif %}{% endfor %} ]
                document_id => "%{host}-%{type}"
                index => "heartbeat"
                workers => 1
                sniffing => true
            }
  }else if [dst_index] =~ /.+/ {
###################################################################################################
###  If the DST_INDEX IS SET
###################################################################################################
#### If the dst_index is dsg then the data must go to both Bay and Lat indexes
#### if it is not dsg handle normally
	elasticsearch {
		hosts => [ {% for host in groups['es-data'] %}"{{ host }}:9210"{% if not loop.last %},{% endif %}{% endfor %} ]
		index => "%{dst_index}-%{+YYYY.MM.dd}"
		template => "{{ template_filename }}"
		template_overwrite => "true"
		workers => 10
		flush_size => 100
		idle_flush_time => 5
		sniffing => true 
    }
} else {
###################################################################################################
###  All else fails send it to the unknown index
###################################################################################################
	elasticsearch {
		hosts => [ {% for host in groups['es-data'] %}"{{ host }}:9210"{% if not loop.last %},{% endif %}{% endfor %} ]
		index => "unknown-%{+YYYY.MM.dd}"
		template => "{{ template_filename }}"
		template_overwrite => "true"
		workers => 4
		flush_size => 1000
		idle_flush_time => 5
		sniffing => true
	}
  }
}

What version of logstash? what version of the kafka are you using? Any idea of the CPU consumption on the logstash instances? How many processors does each logstash instance have?

Logstash 2.3
Kafka 0.9.01

CPU logstash is only using 1 1/2 cores IE 150% cpu out of 12 to 48 coires (some systems have more then others)

I am running a Data node on the same system which does cause some contention for CPU but the system is not 100% utilized (Elasticsearch data node is only using 350% cpu or 3 1/2 cors)

There is a minor amount of IO wait of .6 and there is plenty of RAM >48GB on all systems

I did try the -w option for logstash but I know that is only for filters and besides the Metric filter I am not processing any of the data just input and output

Am I wrong that Logstash should be able to process more then 1000 messages a minute?

I can probably shutdown one Data Node tomorrow and see if logstash starts logging more.

--------Added info

kafka shows me I am receiving on average of 4K a minute which should mean at 800 message a second I need like 5 or 6 logstashs just to keep even

BTW, I am looking for ways of tunning the workers, threads and Fetch sizes so it works effeciently

If I put 100000 as the kafka input fetch size data will be just sitting in the heap till the outputs can process the data

Conversely if I put a Kafka fetch size at 1 more likely the outputs will sit idle and a lot of CPU will be utilized to just fetch new records

I guess I need to know more how to tune Elasticsearch Output, how do I determine what the number of workers and batch size should be? Given that I have 7 nodes and growing. I sort of need to know all the varibles I need to look at to estimate perfomance. Then I can tune from there.

Right now I am just totally guessing the values

Sounds like you already have but if not check out https://www.elastic.co/guide/en/logstash/current/performance-troubleshooting.html and then https://www.elastic.co/guide/en/logstash/current/pipeline.html for more details about how the pipeline can be tuned and more about that -w option.

How big are these messages? You could be saturating your link. Those Bytes in and out look pretty big. You can try enabling compression on the topic to reduce network utilization.

As for tuning Kafka throughput on Logstash, the formula I use for figuring out the max number of consumer_threads is number of partitions, P, divided by the number consumers, C, that are members of the group.

So for your case with 6 max instances and 25 partitions you'd want consumer_threads set to 25 / 6 ~= 4 so four consumer threads per system.

25 partitions is quite a few as well. Keep in mind the anatomy of a topic http://kafka.apache.org/090/documentation.html#intro_topics where each partition is a sequential log of messages. If you are on SSD then 25 partitions on a single disk or one with good IOPS can deliver all those different partitions at the same time but if you are on spinning disks those arms are going to be jerking all over to satisfy all the consumer requests from all the different sequential logs.

Figure out your throughput needs by measuring message size and frequency. Ensure that the underlying hardware can meet these requirements and then you'll be at tweaking logstash time.

Hope some of this helps.

You can run the kafka-consumer-perf-test.sh script from the kafka distro to test the kafka throughput. Here is a link to the sources https://github.com/apache/kafka/tree/0.9.0/bin

I will read those links, even if I have before, there are sometimes nuggets of information I over look so it would be a good review.

Honestly, I think it is my issue is with elasticsearch configurations not Kafka. I am on a 10GB network and they are all within the same VLAN Actually all within the same HP Chassi so there is little network contention. 4MB/s is really nothing in volume. But open to suggestions, I will check the network settings to make sure it is tuned correctly.

I have tuned as few as 1 ES Worker and 1000 Batch and 20 workers and 10 batchsize. and many variation I can only seem to get it to about ~800 messages per seconds.

I will read the docs, you linked and we will see. Plus I will try the shutting down one datanode and see how performance improves.

Ok, yah this new term of "pipelining" while I know the process existed the -b option I was not fully aware of.

I took one ls server and set the -b 5000 to see what would happen.

Seems it looks better, but seems to fall back in line with the rest of the servers, I will have to look at other settings too to make sure I am not breaking anything

But finally for the first time, I am seeing the OUT Bytes per/s in Kafka Higher then the "IN" bytes, which I would expect to happen when there is a backlog of data

But from what I can see, there is little or no increase in CPU load. (Odd but we will see how it looks over night

1 Like

Nope, splitting the Logstash server on to a dedicated box does not seem to perform any better. I still can only processing from 800 to 1000 messages a second.

The odd part is I have added more nodes (The above stats were with 5 Logstash instances, I deployed an additional 4 nodes and the odd part is that they all process at the same rate.

Any clue to what I can use to figure out what is bottlenecking between Lostash(ES output) and Elasticseach?
There is no issue between the Logstash kafka input as I have seen it pull up to 50Mbs a second when I restart.

1000 messages/second per instance of Logstash or just overall?

If the latter i highly recommend you run the Kafka benchmarks.

Here is the performace test as suspected kafka is not an issue, Though It was worth the test, as it thought me a little more about kafka :slight_smile:

The link you gave me was good but I found this one

The test

From the bench mark below we it shows with a single thread I can pull 300,000 messages a second.

Then I did 2 threads not so much more of an increase but ~450K a second and then went to 10 threads just to do it in a "Mythbusters" style. break it till it goes boom

[kafka@hd1melk20lx kafka]$ bin/kafka-consumer-perf-test.sh --zookeeper $HOSTNAME:2181 --messages 500000 --topic logstash --threads 1
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2016-08-18 20:30:46:535, 2016-08-18 20:30:48:058, 465.1847, 305.4397, 500000, 328299.4091

[kafka@hd1melk20lx kafka]$ bin/kafka-consumer-perf-test.sh --zookeeper $HOSTNAME:2181 --messages 500000 --topic logstash --threads 2
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2016-08-18 20:37:51:203, 2016-08-18 20:37:53:339, 913.0774, 427.4707, 987201, 462172.7528

[kafka@hd1melk20lx kafka]$ bin/kafka-consumer-perf-test.sh --zookeeper $HOSTNAME:2181 --messages 500000 --topic logstash --threads 10
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2016-08-18 20:38:11:779, 2016-08-18 20:38:18:324, 3847.7683, 587.8943, 4196277, 641142.3988

is there a performance benchmark program like this for Elasticsearch?

Ok, started to figure some stuff out. (WISH there was a tuning doc or formula on all this)

  • Logstash-input-kafka autoscals with the number of partitions you have allocated. So for example.
    I had 23 partitions and even though the bench mark showed I could pull 305MB/s logstash was only pulling a ~500KB/s per instance when I added 150 Partitions I started pulling data off of kafka at ~50MB/s

Now I am still trying to figure out this new Pipeline (Execution Model) I understand that Filters and outputs are now tied together but how does the "Elasticsearch" workers=> # work with the -w options

I mean I understand that the "workers" will never be bigger then the -b # but are they one in the same ? I mean if I allocated -w 10 and workers=> 10 does that mean I have 100 workers or only 10. Also if I have to outputs do I need to set workers=> 5 each?

Very confused on this

No I wish... Glad you found Jay's gist. He is one of the Kafka authors so good stuff there :slight_smile:

So you have proven Kafka's performance, excellent! It's always nice to go through the exercise to see the sheer power.

The workers are separate from the pipeline of filter/output. Basically an async threadpool underneath is my understanding.

First question, have you checked your offsets for the consumer group? Can you verify that all partitions have consumers?

Next, the benchmark is always running from-beginning and resetting the group offsets which basically makes it consume all messages off the pipeline. Logstash is not resetting the offsets each time so eventually you will only be reading as fast as message are entering the queue so at 50M/day that is ~6k/second. You wouldn't see processing any faster than that because that would basically be "real time". If you want to fire off a node and see if it maxes out you need to run reset_beginning => true and auto_offset_reset => "smallest" and that'll cause it to re-ingest all the messages. Remeber that reset_beginning is destructive and would apply every time logstash starts up so if you want to test a whole cluster, start one with the destructive config. Turn it back off, take it out, start it back up and then start the remainder of the cluster members. That'll have them all consuming at full blast.

I would continue to work down the chain. You've established that Kafka is able to pump the messages through. Now I would focus on Logstash throughput. Check out how I did some bench-marking here:

The idea here is to establish that Logstash can move the data, tweak settings if needed and ensure the delivery pipeline is robust.

Other things to consider:

  • I wonder if the regex are really killing logstash's performance. You may try running configs without the regular expressions.
  • It also could be that back pressure on the output is limiting the Kafka input.

Once you get the numbers you want there, I'd move to ES tweaking. Then you know the pipeline through and through and got a good product to deliver to stakeholders.

A better tweaking guide should be written. Right now I think it is a collection of my tips from the forum and github. One day....

fyi I continue to look at this but it is still so obfuscated way of tunning.

But I am starting to sort of get it, I went back to the defaults of all the values and started to try again.

  1. I see that the Logstash-input-kafka, scales to the number of partitions . I guess there is a negotiation of how many partitions and how many subscribers in the group there is with Zookeeper.

IE if I have 30 partitions and 10 subscribers, they will each have 3 consumers on the topic
if I had 100 partitions and 10 subscribers they will each have 10

Once I have added like 100 partitions, and added a -w 2 and set the input buffer to 100000 I started to receive fetch faster then the 100 publishers were pushing.

As you can see here, I actually got up to 70k m/s don't think that is the peak but that is the speed which it leveled out at.

And as you can see here, each clield is now working at ~3k m/s totalling ~60,000 m/s

so thanks for allowing me to bang my head against the wall, and forcing me to understand that the interactions between kafka and logstash.

I'll open a new topic as I come up with a clearer question.

THANKS!

1 Like