Scale Logstash indexers for >200,000 messages per minute?

First, that isn't a typo. In fact, I want to be able to scale LS to handle >500,000 (maybe a million) message per minute once we get out of our POC.

Our basic setup is as follows:

On-prem LS server + Redis > x1 AWS LS > x3 AWS ES servers < x1 AWS Kibana server

On-prem LS server is a 4 x 2.9GHz/8GB box
AWS LS is a 4 x 2.6GHz/16GB box
AWS ES are 4 x 2.5GHz/16GB boxes with 100GB SSD data drives
AWS Kibana is a 2 x 2.5GHz/8GB box

We are shipping Cisco ASA syslogs that are aggregated by our firewall team to the on-prem LS server via a UDP input and shipping them via lumberjack to localhost where we forward them to the redis queue.

**UDP in/Lumberjack out **

input {
udp {
port => 6001
type => "syslog-cisco-asa"
tags => [ "cisco-asa" ]
}
} #end input block
output{

Output to send input data to logstash-to-redis.conf input

lumberjack {
hosts => "localhost"
port => "5173"
ssl_certificate => "/etc/ssl/certs/logstash-forwarder.crt"
codec => json # added because of https://github.com/elastic/logstash-forwarder/issues/381
}
}
#end output block

** Lumberjack in/Redis out **

input {
lumberjack {
port => 5173
#type => "logs" #removed to see if type from source is passed - DLH 10-27-15
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/certs/logstash-forwarder.key"
codec => json
}
}
output {
redis {
host => "localhost"
data_type => "list"
key => "logstash"
congestion_interval => 1
congestion_threshold => 20000000
workers => 16
# Batch processing requires redis >= 2.4.0
batch => true
batch_events => 500
batch_timeout => 5
}
}

What we've noticed is that with the ASA log input enabled that the redis queue quickly started to fill up. We tweaked out AWS LS input as follows:

input {
redis {
host => "AWSLS_servername"
port => 6379
threads => 800
key => "logstash"
data_type => "list"
codec => json
}
}

However, as soon as we added the filtering/groking for ASA formating we noticed that we could no longer keep up. Logstash just couldn't pull records from the Redis queue fast enough. (Note: The filtering and groking are not our own invention, this is the result of some Googlefu and judicious copy/paste)

** Moved to the first reply because I exceeded my 5000 characters per message limit

We've tweaked AWS LS to run with 2 workers for the output as shown in the output below:

output {
elasticsearch {
host => ["AWSES01","AWSES02","AWSES03"]
cluster => itoaelk
workers => 2
}
}

Is there any way to increase the performance of the filter/groking?

What about scaling the AWS LS environment? If we stand up a couple of extra servers and point them as the redis queue, is that sufficient (proper?!?) way to expand the environment?

Once we enable both ASA aggregation points to send to our ELK environment, we'll be handling about 100GB per day of just ASA messages. We'll then need to build out capacity for our application and event logs.

Lots of data + inexperienced ELK admin = lots of crazy questions!

Thanks - Josh

Here is our filter/grok config:

filter {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
}
filter {
if [type] == "syslog-cisco-asa" {
# Split the syslog part and Cisco tag out of the message
grok {
match => ["message", "%{CISCO_TAGGED_SYSLOG} %{GREEDYDATA:cisco_message}"]
}
syslog_pri { }
date {
match => ["timestamp",
"MMM dd HH:mm:ss",
"MMM d HH:mm:ss",
"MMM dd yyyy HH:mm:ss",
"MMM d yyyy HH:mm:ss"
]
timezone => "America/New_York"
}
if "_grokparsefailure" not in [tags] {
mutate {
rename => ["cisco_message", "message"]
remove_field => ["timestamp"]
}
}
grok {
match => [
"message", "%{CISCOFW106001}",
"message", "%{CISCOFW106006_106007_106010}",
"message", "%{CISCOFW106014}",
"message", "%{CISCOFW106015}",
"message", "%{CISCOFW106021}",
"message", "%{CISCOFW106023}",
"message", "%{CISCOFW106100}",
"message", "%{CISCOFW110002}",
"message", "%{CISCOFW302010}",
"message", "%{CISCOFW302013_302014_302015_302016}",
"message", "%{CISCOFW302020_302021}",
"message", "%{CISCOFW305011}",
"message", "%{CISCOFW313001_313004_313008}",
"message", "%{CISCOFW313005}",
"message", "%{CISCOFW402117}",
"message", "%{CISCOFW402119}",
"message", "%{CISCOFW419001}",
"message", "%{CISCOFW419002}",
"message", "%{CISCOFW500004}",
"message", "%{CISCOFW602303_602304}",
"message", "%{CISCOFW710001_710002_710003_710005_710006}",
"message", "%{CISCOFW713172}",
"message", "%{CISCOFW733100}"
]
}
#added geoip
geoip {
add_tag => [ "GeoIP" ]
database => "/opt/logstash/config/geoip/GeoLiteCity.dat"
source => "src_ip"
}
if [geoip][city_name] == "" { mutate { remove_field => "[geoip][city_name]" } }
if [geoip][continent_code] == "" { mutate { remove_field => "[geoip][continent_code]" } }
if [geoip][country_code2] == "" { mutate { remove_field => "[geoip][country_code2]" } }
if [geoip][country_code3] == "" { mutate { remove_field => "[geoip][country_code3]" } }
if [geoip][country_name] == "" { mutate { remove_field => "[geoip][country_name]" } }
if [geoip][latitude] == "" { mutate { remove_field => "[geoip][latitude]" } }
if [geoip][longitude] == "" { mutate { remove_field => "[geoip][longitude]" } }
if [geoip][postal_code] == "" { mutate { remove_field => "[geoip][postal_code]" } }
if [geoip][region_name] == "" { mutate { remove_field => "[geoip][region_name]" } }
if [geoip][time_zone] == "" { mutate { remove_field => "[geoip][time_zone]" } }
# Gets the source IP whois information from the GeoIPASNum.dat flat file database
geoip {
add_tag => [ "Whois" ]
database => "/opt/logstash/config/geoip/GeoLiteCity.dat"
source => "src_ip"
}
}# syslog-cisco-asa
if [type] == "syslog"{
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
} #end syslog if
if [type] == "IHS Server" {
#find a pattern thing for IHS
}
} #end filter block

However, as soon as we added the filtering/groking for ASA formating we noticed that we could no longer keep up. Logstash just couldn't pull records from the Redis queue fast enough.

Have you tried increasing batch_count for the Redis input?

Is there any way to increase the performance of the filter/groking?

Have you increased the number of filter workers with the -w startup option? IIRC it defaults to one.

What about scaling the AWS LS environment? If we stand up a couple of extra servers and point them as the redis queue, is that sufficient (proper?!?) way to expand the environment?

Yes, horizontal scaling is a good way of increasing Logstash processing throughput. Of course, your ES cluster still needs to be able to handle the ingress traffic.

1 Like

Is expanding our AW LS environment as easy as standing up another LS box with the same redis input? It doesn't look like there is any cluster awareness like with ES.

Worker processes are set to 2 right now. We have 4 CPUs.

I haven't played with batch_count yet, default to 5000. (If I remember the config documentation correctly). I'll play with that now and report back.

Is expanding our AW LS environment as easy as standing up another LS box with the same redis input?

Yes, as long as your events are independent it's that simple. For example, if you use filters like multiline and elapsed everything needs to be funneled through a single instance.

It doesn't look like there is any cluster awareness like with ES.

Indeed not, but as long as your events are independent I don't know how cluster awareness would be useful.

Worker processes are set to 2 right now. We have 4 CPUs.

And are you saturating the CPUs?

I haven't played with batch_count yet, default to 5000. (If I remember the config documentation correctly). I'll play with that now and report back.

batch_count defaults to 1 (one).

That's a massive amount of EPS!
What's your retention requirements?

@jbiggley There are 2 notions of workers. One is for filters, and the other one is output workers. Your bottleneck seems to be the grok filter, so you should set -w when starting Logstash to half your cores.

Also as @magnusbaeck suggested, make sure to saturate your CPU. If you are not, you can increase the -w.

See https://www.elastic.co/guide/en/logstash/current/command-line-flags.html for details.

1 Like

Also I would add protocol=>http in elasticsearch output which means it will use http to communicate with ES. Default is java node protocol.

1 Like

Thanks @suyograo and @magnusbaeck for the help. We've slain the beast! Definitely couldn't have done it without your suggestions.

@suyograo I haven't added the protocol =>http to the elasticsearch output, but I will.

@magnusbaeck thanks for the suggestion on saturating the CPUs. We're now running between 50-80% utilized on those 4 core boxes.

Here's what we did.

First, we expanded our ES cluster to three nodes and pinned the java process to run at 8g

Second, we added a second LS box. Tweaked or not, we were going to need the additional capacity to grab from the redis queue.

Third, we tweaked the redis input as follows, increasing both threads and batch_count

input {
redis {
host => "our_LS_host"
port => 6379
threads => 800
batch_count => 20
key => "logstash"
data_type => "list"
codec => json
}
}

Fourth, we change the output to increase the workers sending the ElasticSearch:

output {
elasticsearch {
host => ["ES01","ES02","ES03"]
cluster => itoaelk
workers => 4
flush_size => 5000
}
}

Finally, since the issue is really about us being unable to grok the data fast enough, we increased the number of filter/groking workers when we execute the logstash instance with the following command:

/opt/logstash/bin/logstash -w 8 -f /opt/logstash/config/redis.conf

Eight hundred threads reading from Redis? I doubt that's the best use of CPU and RAM. I'd recommend increasing the batch size and scaling down the threads.

So that I understand correctly, batch_size is the number of records pulled from the Redis queue and the threads are number of discrete processes pulling the number of records specified in the batch_size, is that correct?

I'll have to get back to you when I have a chance to play with our environment a little more as I ran out of disk space. I ate through 100GB in less than 24 hours. At about 2KB per message that's a lot of messages!

So that I understand correctly, batch_size is the number of records pulled from the Redis queue

Yes, batch_size is the number of messages fetched per Redis request. With the default batch size of one (1) there will be an awful amount of round-trip overhead if the message rate is high.

and the threads are number of discrete processes pulling the number of records specified in the batch_size, is that correct?

Yes. Processes in the general sense rather than operating system processes. Having significantly more threads than you have CPUs is typically not useful. They'll just compete for CPU and use more memory. If you have high-latency communication links increasing parallelism might still make sense, but normally the broker is close by.

1 Like

Hey @magnusbaeck, thanks for all the help. We're done a couple of interesting things based on your recommendation.

We still have a single on-premise LS/Redis server, but we now have 2 Redis queues in that instance and we are shipping all of the ASA logs to that 2nd Redis queue.

In our cloud-based LS instances (we have 2 LS servers), we've created 2 inputs. One input is for the non-ASA data (and it filters and groks it accordingly). The second input is for the ASA data. It has the more complex filtering and groking.

The non-ASA input is running with:
threads => 2
batch_count => 200
key => "logstash"

The non-ASA output to ES is running with:
workers => 2

We launch it with two filter workers
/opt/logstash/bin/logstash -w 2 -f /opt/logstash/config/redis.conf

For the ASA input:
threads => 20
batch_count => 200
key => "logstash-asa"

For the ASA ES output:
workers => 4
index => "logstash-asa%{+YYYY.MM.dd}"

We launch it with 8 filter workers
/opt/logstash/bin/logstash -w 8 -f /opt/logstash/config/redis-asa.conf

We're crunching through about 3200 records per second for all data sources with ~1200 of those from the ASAs with some pretty intense groking (as indicated above).

Again, thanks for point us in the right direction.

1 Like

For what it's worth, 500,000 events per minute is very attainable. 200k/min is roughly ~3300 events/sec, and 500k/min is around ~8300 events/sec.

There are users doing 200,000 messages per second with the Elastic Stack. Hopefully we can get you pointed in the right direction with this thread :slight_smile:

Based on your configuration above, I would expect the major bottleneck to be your filters. Recommend you run logstash with multiple filter workers so you can put more cpu cores into that effort.

I also recommend taking some sample logs and testing each filter, in isolation, to see what kind of performance hit each filter has for your pipeline. If you find a particularly slow one, especially grok, then you might be able to tweak what the filter is doing to improve its performance.

I would try to avoid using those if statements if possible because they severely hit the throughput of Logstash. In one of my test with 24 core CPU and 192 GB RAM hosts, a few if statements dropped Logstash throughput from 50K per min to 6 - 10K per min.

With how many nodes?