[Solved] Cluster Recommendations? Slow Indexing on Elastic Stack 5

I have a ten node cluster running fully on Elastic 5.3 (except Filebeat, which is v5.2.2). I need some tuning recommendations for how to get my indexing rate up (>3k/sec for primary shards).

About my config:

Four nodes are Logstash Servers, three are Master nodes, five data nodes, two coordinating nodes, and one ingest node with the following stats:

My full config can be seen at the bottom of this post.

The problem is that this cluster isn't indexing as fast as data is being pushed to it -- when I look at what days (from the log timestamps) are currently being indexed, I see data from over two weeks ago that was finally indexed in the last 15 minutes:

When I look in the ES logs, I see error code 429, but with the amount of servers, I feel like I shouldn't be receiving this error;

[2017-04-14T08:33:33,108][INFO ][logstash.outputs.elasticsearch] retrying failed action with response code: 429 ({"type"=>"es_rejected_execution_exception", "reason"=>"rejected execution of org.elasticsearch.transport.TransportService$7@607660d8 on EsThreadPoolExecutor[bulk, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@988bbe9[Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 12638848]]"})
[2017-04-14T08:33:33,108][INFO ][logstash.outputs.elasticsearch] retrying failed action with response code: 429 ({"type"=>"es_rejected_execution_exception", "reason"=>"rejected execution of org.elasticsearch.transport.TransportService$7@15d8c417 on EsThreadPoolExecutor[bulk, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@988bbe9[Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 12638849]]"})
[2017-04-14T08:33:33,108][ERROR][logstash.outputs.elasticsearch] Retrying individual actions
[2017-04-14T08:33:33,108][ERROR][logstash.outputs.elasticsearch] Action
[2017-04-14T08:33:33,108][ERROR][logstash.outputs.elasticsearch] Action
.
.

If I drop the number of shards to zero (for bulk indexing, to bring my monitor to near-real-time indexing), I don't see any improvement in primary shard indexing. However, when I reenable sharding, I see the ability of my cluster to index at >12k events/sec:

My questions:

Is there a way to remove all the queued data in Logstash? I imagine the data from previous days is being stored in Logstash queues. This data is no longer valuable and I'd like to simply remove it. The fact that Logstash is using 22GB of memory on just one box makes me think these old events are being held somewhere. Am I able to delete them and just start fresh?

What can I do to increase my indexing rate? I'd like to tune what I already have as it's plenty of hardware for indexing >3k events/sec.

Any overall advice?

My Logstash Filters:

input {
  beats {
    port => 5044
    add_field => [ "LogstashServer", "elastic03" ]
  }
}
filter {
  if [type] == "nginx-access" {
    grok {
      patterns_dir => ["/opt/logstash/patterns"]
      match => { "message" => [ "%{NGINXACCESS}", "%{NGINXACCESSAriel}", "%{NGINXACCESSAriel2}","%{NGINXACCESSlayer}", "%{NGINXCrit}", "%{NGINXACCESSAriel3}", "%{NGINXHeader}" ] }
    }
    if [nginx_message] {
      grok {
        patterns_dir => ["/opt/logstash/patterns"]
        match => { "nginx_message" => [ "%{NGINXAerial0}", "%{NGINXAerial1}", "%{NGINXAerial2}" ] }
        tag_on_failure => "nginx_message_fail"
      }
    }
    if [referrer] {
      grok {
        patterns_dir => ["/opt/logstash/patterns"]
        match => { "referrer" => ["%{nginxReferrerP}", "%{nginxReferrerP2}", "%{nginxReferrerP2}"] }
        tag_on_failure => "referrer_fail"
      }
      if [clientTool] {
        mutate {
          add_field => {
            "bubble" => "%{clientTool}loc=%{LatLon}"
          }
        }
      }
    }
    if [referrersubstring] {
      grok {
        patterns_dir => ["/opt/logstash/patterns"]
        match => { "referrersubstring" => "%{nginxSubReferrerP}" }
        tag_on_failure => "referrer_substring_fail"
      }
    }
    if [request] {
      grok {
        patterns_dir => ["/opt/logstash/patterns"]
        match => { "request" => "%{imageRequestP}" }
        tag_on_failure => "request_fail"
      }
      if [dataset_parent] {
        grok {
          patterns_dir => ["/opt/logstash/patterns"]
          match => { "dataset_parent" => "%{datasetP}" }
          tag_on_failure => "dataset_fail"
        }
        if [request_tail] {
          grok {
            patterns_dir => ["/opt/logstash/patterns"]
            match => { "request_tail" => [ "%{request_tailP}", "%{request_tailP2}"] }
            tag_on_failure => "dataset_fail"
          }
        }
      }
    }
    if [latitude] and [longitude] {
      mutate {
        add_field => [ "[location][lat]",  "%{latitude}" ]
        add_field => [ "[location][lon]", "%{longitude}" ]
        add_field => [  "geoLocation", "%{longitude}" ]
        add_field => [  "geoLocation", "%{latitude}" ]
        add_tag => [ "geopoint", "lmtopslog01" ]
      }
      mutate {
        convert => [ "[location][lat]", "float" ]
        convert => [  "[location][lon]", "float" ]
        convert => [  "geoLocation", "float" ]
        add_tag => [ "geoLocationationAdded" ]
      }
    }
    if [host] {
      mutate {
        add_field => { "serveralias" => "unknownhost" }
      }
      mutate {
        convert => { "serveralias" => "string" }
      }
      if [host] == "hyd1-clickonce-prod-node01" { mutate { update => { "serveralias" => "clickonce" } } }
      else if [host] == "hyd1-mapscache-prod-node01" { mutate { update => { "serveralias" => "mapscache01" } } }
      else if [host] == "hyd1-mapscache-prod-node02" { mutate { update => { "serveralias" => "mapscache02" } } }
      else if [host] == "hyd1-mapscache-prod-node03" { mutate { update => { "serveralias" => "mapscache03" } } }
      else if [host] == "hyd1-mapscache-prod-node04" { mutate { update => { "serveralias" => "mapscache04" } } }
      else if [host] == "hyd1-mapscache-prod-node05" { mutate { update => { "serveralias" => "mapscache05" } } }
      else if [host] == "hyd1-mapscache-prod-node06" { mutate { update => { "serveralias" => "mapscache06" } } }
    }
    if "_grokparsefailure" not in [tags] {
      mutate { remove_field => [ "message", "tags"  ] }
    }
    if [NginxResponse] {
      grok {
        patterns_dir => ["/opt/logstash/patterns"]
        match => { "NginxResponse" => "%{nginxResponseP}" }
      }
    }
# heatlamp field creation based on the existence of go_http_agent (field only exists on heatlamp requests)
    if [go_http_client] {
      mutate {
        add_field => { "heatlamp_request" => "true" }
      }
    }
    else {
      mutate {
        add_field => { "heatlamp_request" => "false" }
      }
    }
  }
}
filter {
  if [type] == "haproxy-access" {
    grok {
      patterns_dir => ["/opt/logstash/patterns/"]
      match => { "message" => [ "%{HAPROXYHTTP}", "%{haproxyNginxp}",  "%{haproxyp}", "%{haproxyrepeatedp}" ] }
  }
    if [HAProxyMessage] {
      grok {
        patterns_dir => ["/opt/logstash/patterns/"]
        match => { "HAProxyMessage" => [ "%{HAPROXYHTTPBASE}", "%{HAPROXYHTTP}", "%{HAPROXYTCP}", "%{HAProxyMessagep}", "%{HAProxyMessagep}" ] }
      }
    }
# add server aliases for the four cache servers
    if [host] {
      mutate {
        add_field => { "serveralias" => "unknownhost" }
      }
      mutate {
        convert => { "serveralias" => "string" }
      }
      if [host] == "hyd1-clickonce-prod-node01" { mutate { update => { "serveralias" => "clickonce" } } }
      else if [host] == "hyd1-mapscache-prod-node01" { mutate { update => { "serveralias" => "mapscache01" } } }
      else if [host] == "hyd1-mapscache-prod-node02" { mutate { update => { "serveralias" => "mapscache02" } } }
      else if [host] == "hyd1-mapscache-prod-node03" { mutate { update => { "serveralias" => "mapscache03" } } }
      else if [host] == "hyd1-mapscache-prod-node04" { mutate { update => { "serveralias" => "mapscache04" } } }
      else if [host] == "hyd1-mapscache-prod-node05" { mutate { update => { "serveralias" => "mapscache05" } } }
      else if [host] == "hyd1-mapscache-prod-node06" { mutate { update => { "serveralias" => "mapscache06" } } }
    }
    if [http_request] {
      grok {
        patterns_dir => ["/opt/logstash/patterns/"]
        match => { "http_request" => [ "%{http_request_test}" ] }
      }
    }
    if "_grokparsefailure" not in [tags] {
      mutate { remove_field => [ "message", "HAProxyMessage", "tags" ] }
    }
  }
}
▽
filter {
  if [type] == "dirstats" {
    drop {}
  }
}
output {
  if [type] in ["haproxy-access", "stunnel"] {
    elasticsearch {
      hosts => ["10.191.4.44:9200","10.191.4.126:9200","10.191.5.69:9200","10.191.5.54:9200", "10.191.4.38:9200", "10.191.5.42:9200", "10.191.5.7:9200"]
      index => "logstash-%{+YYYY.MM.dd}"
      document_type => "%{[@metadata][type]}"
      sniffing => "false"
    }
  }
  else if [type] in [ "nginx-access", "whatever" ] {

      hosts => ["10.191.4.44:9200","10.191.4.126:9200","10.191.5.69:9200","10.191.5.54:9200", "10.191.4.38:9200", "10.191.5.42:9200", "10.191.5.7:9200"]
      index => "nginx-%{+YYYY.MM.dd}"
      document_type => "%{[@metadata][type]}"
      sniffing => "false"
    }
  }
  else {
    elasticsearch {

      hosts => ["10.191.4.44:9200","10.191.4.126:9200","10.191.5.69:9200","10.191.5.54:9200", "10.191.4.38:9200", "10.191.5.42:9200", "10.191.5.7:9200"]
      index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
      document_type => "%{[@metadata][type]}"
      sniffing => "false"
    }
  }
}

My Elasticsearch nodes all have memory locked at 26GB, otherwise half of the total node memory. One interesting thing I noticed while looking at the stats is that my indexing time is 68 seconds, which is certainly alarming;

Once ES gets the data, it gets indexed. There's nothing in ES that stores data without indexing it to account for this delay, so it's likely caused by filebeat not sending it through.

That would be why you are seeing the delay.

I'd start by reducing your shard count. It's not crazy, but for 85 gig of data you can easily do that with 1 shard per index (plus a replica).

LS has a queue of 40 events, so it's not that. If you want to clear that, just restart LS :)[quote="seth.yes, post:1, topic:82436"]
The fact that Logstash is using 22GB of memory on just one box makes me think these old events are being held somewhere.
[/quote]

LS is a JVM process, it'll only use whatever you assign as heap. Anything else you are seeing is the OS doing caching.

What else does monitoring show - high cpu, GC?

So if ES isn't able to keep up, the data stays on the Filebeat agent until it is published, correct?

I have updated the number of shards from the instructions found in indices-templates and indices-shrink.

Once I shrunk and replaced the current indices with the shrunk version, indexing jumped up significantly ~ 2X!

I'm still not convinced that's the max throughput this cluster is capable of. I'm thinking I'm still encountering some other unaddressed issues:

Garbage collection is still occuring on nodes; times from one of my data nodes are shown below;

Also, I feel I should be concerned about the indexing time seen on my data nodes:

Is there any more data I can provide to present a more clear picture of my cluster? Thanks for the valuable insight already given.

With the default settings and 3 defined Elasticsearch outputs it is likely that the size of the bulk requests may be quite small. It is possible that you might get better performance if you increase the internal Logstash batch size (-b parameter), e.g. to 1000 (it is by default only 125). You should also be able to consolidate your 3 elastic search outputs into one by e.g. registering the index prefix in a metadata field, e.g. [@metadata][index_prefix], during processing and then reference this to create the index pattern in the Elasticsearch output (index => "%{[@metadata][index_prefix]}-%{+YYYY.MM.dd}").

Not bad! I increased the bulk requests to 1000 on all four LS hosts and immediately saw the indexing rate become more stable, and increased by ~1k events/sec -- now stable at 5k events/sec, as compared to the 3k I started with a couple days ago.

Thanks @Christian_Dahlqvist & @warkolm

With regards to the index prefix -- I understand how to set this up in the elasticsearch output, but can you provide an example or documentation regarding how to set this in processing?

I'm working to set up a warm/hot architecture in order to utilize a few SSD partitions I have on some of my ES boxes. I'll update this thread with rates once I get that going and I'll start up another thread with any issues I encounter setting that up.

I think something like this should work if placed at the end of the filter section:

if [type] in ["haproxy-access", "stunnel"] {
  mutate {
    add_field => { "[@metadata][index_prefix]" => "logstash" }
  } 
}
else if [type] in [ "nginx-access", "whatever" ] {
  mutate {
    add_field => { "[@metadata][index_prefix]" => "nginx" }
  }
}
else {
  mutate {
    add_field => { "[@metadata][index_prefix]" => "%{[@metadata][beat]}" }
  }
}

I wanted to follow up this thread by saying that many adjustments increased my indexing rate to the steady 5k events per second on primary shards.

The top three modifications for my particular stack were (in order):

  1. implementing a hot/warm ES architecture
  2. Tuning Logstash batch sizes and related outputs
  3. Tuning Logstash hardware allocation

Thanks again for your help Elastic Team.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.