Performance Tips For Logstash And Monitoring Its Queue

(Nitzan Haimovich) #1


We have logs infrastructure, built in this form:

  1. Filebeat agents consume logs and output them to one topic in Kafka. We are talking about ~50k messages per sec. This part of the architecture works great.
  2. We have 50 partitions for this topic, which means we have 50 Logstash instances, each instance has one consumer thread.
  3. Our Logstash configuration is pretty simple, without many GROKs. Configuration will be added as a comment for this thread (both logstash.conf and logstash.yml)
    Also - We used at first the default queue (in-memory), it caused to massive lags on Kafka (millions!). We changed the queue to disk based (persistent), and now we have no lags at Kafka at all - but we fear that the lags are on the Logstash queue now.
  4. We output everything to ES, splitting all the logs into 4 indices.
  5. Our ES structure - 2 client nodes (8 cores + 32GB mem), 3 data master nodes (8 cores + 32GB mem).
  6. ES indexes docs at the rate of ~30k per sec, ~10k per each data node. At this point, the clients collapsing because it reached the heap size (85%+).
  7. We also made some tests and found out the Elasticsearch is our bottleneck.

Now, I have a few questions I would like to ask:

  1. How can I monitor my disk based queue on Logstash? I need to be sure we are not accumulating a lag inside Logstash which can cause the disk to blow up (and probably to various other failures).
  2. Why outputting to ES is so slow, and how can I make this process be faster? We tried to change number of output workers but it didn't improve anything.
  3. For dealing with an aprrox amount of 50k+ msgs per sec, what is your suggested ES cluster size?

Thank You!


(Nitzan Haimovich) #2

logstash.yml /var/lib/logstash
pipeline.batch.size: 1000
path.config: /etc/logstash
config.reload.automatic: false
config.debug: false
queue.type: persisted
queue.max_bytes: 8gb
log.level: warn
path.logs: /var/log/logstash

(Nitzan Haimovich) #3


input {
kafka {
bootstrap_servers => "KAFKA_IP:9092"
group_id => "test"
topics => ["test"]
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
codec => "json"
filter {
if [type] == "stdout" or [type] == "stderr" {
grok {
match => { "source" => "/logs/%{WORD}/%{DATA:agent_id}/frameworks/%{DATA:framwork_id}/executors/%{GREEDYDATA:service}._%{GREEDYDATA:exec}/runs/%{DATA:run_id}/%{WORD:log_type}" }
if ("action_time" in [message])
json { source => "message" }
mutate { rename => { "action_time" => "action_timestamp" } }
else if ("time:" in [message])
json { source => "message" }
mutate { rename => { "time" => "action_timestamp" } }
else {
grok {
match => { "message" => ["%{DATE1:action_timestamp}","%{DATE2:action_timestamp}"," %{DATE1:action_timestamp}","%{WORD} %{DATE2:action_timestamp}","%{WORD} [%{DATE2:action_timestamp}]","%{WORD} %{TIMESTAMP_ISO8601:action_timestamp}"," ERRO: %{DATE1:action_timestamp}"," INFO: %{DATE1:action_timestamp}"]}
if "_grokparsefailure" in [tags] {
ruby {
code => 'event.set("action_timestamp",event.get("@timestamp"))'
output {
elasticsearch {
hosts => ["ES_IPs"] index => "%{type}-%{+xxxx.ww}"

(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.