Logstash taking too long to process data

Hi,

I have setup the following architecture filebeat -> kafka -> logstash -> elasticsearch. Filebeat is sending data to kafka without any issue. I can validate that by using the command line of kafka. Logstash is consuming those data through three topics.
The issue that I'm facing is that most of the time it take too long for logstash to consume data from a given topic.
Sure, I will say I harvesting ALOT of data with filebeat (millions of logs).

Is there are a way to make logstash get those data faster?

P.S my logstash config is using two filters.

Will increasing the workers of logstash provide better performance to my logstash?

BTW what is the ratio of performance to memory usage to workers efficient for logstash?

Thanks.

Logstash performance depends on a few factors, e.g. number and complexity of filters used, number of filter and output workers and available system resources. The internal pipeline and how you tune it has changed over recent versions, so providing your config and the version of Logstash used will help if you are looking for feedback on how to best tune it. Having said that, Logstash processing will be limited by the throughput of the slowest output, so it is also important to ensure that your Elasticsearch cluster, or other outputs, are not the bottleneck.

Hi @Christian_Dahlqvist thanks for the reply.
I'm using filebeat 5.1.1, kafka 2.11-0.10.1.0 (scala version 11 and kafka version 10), logstash 5.1.1, and elasticsearch 5.1.1.

Here is my logstash config file

input {
  beats { 
    port => 5044
  }
  kafka { 
    bootstrap_servers => ["kafkaip1:9092,kafkaip2:9092,kafkaip3:9092"]
    topics => ["topic1","topic2","topic3"]
        codec => "json"
        group_id => "app8"
  }
}

filter {
  mutate {
    # Renames the 'fieldName' field to 'fieldname'
    rename => { "fieldName" => "fieldname" }
  }
}


filter {
  #For app1 logs
  if [type] == "app1logs" {
    mutate {
      lowercase => [ "source" ]
      gsub => [
        # replace all forward slashes with nothing
        "source", "/", "",
        # replace varlogapp1apps with nothing
        "source", "varlogapp1apps", ""
      ]
    }
  }
}

output {
  #For services logs 
  if [type] == "app2logs" {
    elasticsearch {
      hosts => ["http://elasticsearchip1:9200","http://elasticsearchip2:9200","http://elasticsearchip3:9200"]
      user => "username"
      password => "password"
      manage_template => true
      index => "%{type}-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "app1logs" {
    elasticsearch {
      hosts => ["http://elasticsearchip1:9200","http://elasticsearchip2:9200","http://elasticsearchip3:9200"]
      user => "username"
      password => "password"
      manage_template => true
      index => "%{type}-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "app1logs" {
      elasticsearch {
      hosts => ["http://elasticsearchip1:9200","http://elasticsearchip2:9200","http://elasticsearchip3:9200"]
      user => "username"
      password => "password"
      manage_template => true
      index => "%{source}"
    }
  }
  if [type] == "app3logs" {
    elasticsearch {
      hosts => ["http://elasticsearchip1:9200","http://elasticsearchip2:9200","http://elasticsearchip3:9200"]
      user => "username"
      password => "password"
      manage_template => true
      index => "%{type}-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "app4logs" {
    elasticsearch {
      hosts => ["http://elasticsearchip1:9200","http://elasticsearchip2:9200","http://elasticsearchip3:9200"]
      user => "username"
      password => "password"
      manage_template => true
      index => "%{type}-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "app5logs" {
    elasticsearch {
      hosts => ["http://elasticsearchip1:9200","http://elasticsearchip2:9200","http://elasticsearchip3:9200"]
      user => "username"
      password => "password"
      manage_template => true
      index => "%{type}-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "app6logs" {
    elasticsearch {
      hosts => ["http://elasticsearchip1:9200","http://elasticsearchip2:9200","http://elasticsearchip3:9200"]
      user => "username"
      password => "password"
      manage_template => true
      index => "%{type}-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "app7logs" {
    elasticsearch {
      hosts => ["http://elasticsearchip1:9200","http://elasticsearchip2:9200","http://elasticsearchip3:9200"]
      user => "username"
      password => "password"
      manage_template => true
      index => "%{type}-%{+YYYY.MM.dd}"
    }
  }

  #For app8
  if [type] == "app8logs" {
    elasticsearch{
      hosts => ["http://elasticsearchip1:9200","http://elasticsearchip2:9200","http://elasticsearchip3:9200"]
      user => "username"
      password => "password"
      manage_template => true
      index => "%{type}"
    }
  }
  if [type] == "app8logs" {
    elasticsearch{
      hosts => ["http://elasticsearchip1:9200","http://elasticsearchip2:9200","http://elasticsearchip3:9200"]
      user => "username"
      password => "password"
      manage_template => true
      index => "%{[fields][logsfield]}-instance.log"
    }
  }
  if [metricfield] {
    elasticsearch{
      hosts => ["http://elasticsearchip1:9200","http://elasticsearchip2:9200","http://elasticsearchip3:9200"]
      user => "username"
      password => "password"
      manage_template => true
      index => "%{metricfield}-%{+YYYY.MM.dd}"
    }
  }
  #For app8 metrics 
  if [fieldname] {
    elasticsearch{
      hosts => ["http://elasticsearchip1:9200","http://elasticsearchip2:9200","http://elasticsearchip3:9200"]
      user => "username"
      password => "password"
      manage_template => true
      index => "%{fieldname}-%{+YYYY.MM.dd}"
    }
  }
  #For metrics app1, app2 and app3 metrics
  if [type] == "metricsets" {
     elasticsearch{
      hosts => ["http://elasticsearchip1:9200","http://elasticsearchip2:9200","http://elasticsearchip3:9200"]
      user => "username"
      password => "password"
      manage_template => true
      index => "%{[metricset][module]}-metrics-%{+YYYY.MM.dd}"
    }
  }

   #For app9 
   if [app9][innerfield1][innerfield2] {
     elasticsearch{
        hosts => ["http://elasticsearchip1:9200","http://elasticsearchip2:9200","http://elasticsearchip3:9200"]
        user => "username"
        password => "password"
        manage_template => true
        index => "%{[app9][innerfield1][innerfield2]}-%{+YYYY.MM.dd}"
     }
   }
}

Thanks.

I suspect having a large number of Elasticsearch outputs writing to the same index will result in very small bulk requests to Elasticsearch, which can hurt performance. I would recommend creating a single output writing to the quoted index and change the conditional to compare the type field against a list of values. Given the number of outputs, you may also want to try increasing the internal batch size to see if this improves performance. Start with 1000 and slowly increase this until no further improvement in throughput is seen.

To further what Christian said and as an alternative - you should move all the output conditional logic to the filter section and add a metadata field, say, index_prefix that you later use in a single ES output.

This has the benefit of making one network call to ES for the whole batch.

In Logstash the output section is handled by collecting an array of output + the subset of events (from the batch) that satisfy that outputs conditional, it then iterates over these outputs, making a separate network connection to ES for each and submitting a bulk request for a smaller subset of events.

1 Like

@guyboertje and @Christian_Dahlqvist thanks for the reply. Sorry for the delay, I was on something esle more urgent. I fine tuned filebeat as such I didn't needed some of those configures any more. As per you requirements, I also increase the internal batch size. But after doing that my logstash wasn't behaving better neither.
Here is the new configurations

 input {
  kafka { 
    bootstrap_servers => ["kafkaip1:9092,kafkaip2:9092,kafkaip3:9092"]
    topics => ["applogstopic"]
        codec => "json"
        session_timeout_ms => "30000"
        group_id => "logstash"
  }
}


filter {
  #For app logs
  if [type] == "applogs" {
    mutate {
      lowercase => [ "source" ]
      gsub => [
        # replace all forward slashes with nothing
        "source", "/", "",
        # replace varlogappapps with nothing
        "source", "varlogappapps", ""
      ]
    }
  }
}

output {
  if [type] == "applogs" {
    elasticsearch {
      hosts => ["http://elasticsearchip1:9200","http://elasticsearchip2:9200","http://elasticsearchip3:9200"]
      user => "user"
      password => "password"
      manage_template => true
      index => "%{source}"
    }
  }
}

Minotoring the stat of the pipeline

curl -XGET 'logstaship:9600/_node/stats/pipeline?pretty'

I get

{
  "host" : "london201",
  "version" : "5.2.0",
  "http_address" : "logstaship:9600",
  "id" : "789d5b0b-bd4c-4df1-aa09-c07b852e34bb",
  "name" : "london201",
  "pipeline" : {
    "events" : {
      "duration_in_millis" : 17881576,
      "in" : 109129,
      "filtered" : 109129,
      "out" : 105129
    },
    "plugins" : {
      "inputs" : [ ],
      "filters" : [ {
        "id" : "59a9d6f7428808348047e6491ba66de57c78cd24-2",
        "events" : {
          "duration_in_millis" : 1830,
          "in" : 109129,
          "out" : 109129
        },
        "name" : "mutate"
      } ],
      "outputs" : [ {
        "id" : "59a9d6f7428808348047e6491ba66de57c78cd24-3",
        "events" : {
          "duration_in_millis" : 17877790,
          "in" : 109129,
          "out" : 105129
        },
        "name" : "elasticsearch"
      } ]
    },
    "reloads" : {
      "last_error" : null,
      "successes" : 0,
      "last_success_timestamp" : null,
      "last_failure_timestamp" : null,
      "failures" : 0
    },
    "queue" : {
      "type" : "memory"
    }
  }
}

Unless I'm not able to interpret this, my guess is that logstash is taking 17881576 milliseconds( ~298 minutes) to process 109129. This can't be possible.

Where am I going wrong any clues?
Thanks.

It looks like Elasticsearch may be the limiting factor. What indexing rate are you seeing? What is the specification of the hosts Elasticsearch is deployed on?

@Christian_Dahlqvist thanks for the reply, that was a fast one.
With the commands

curl -u user:password -XGET  'elasticsearchip1:9200/_cluster/stats?human&pretty'

I get

{
  "_nodes" : {
    "total" : 3,
    "successful" : 3,
    "failed" : 0
  },
  "cluster_name" : "myelasticsearch_cluster",
  "timestamp" : 1485985995220,
  "status" : "green",
  "indices" : {
    "count" : 78,
    "shards" : {
      "total" : 653,
      "primaries" : 311,
      "replication" : 1.0996784565916398,
      "index" : {
        "shards" : {
          "min" : 2,
          "max" : 30,
          "avg" : 8.371794871794872
        },
        "primaries" : {
          "min" : 1,
          "max" : 10,
          "avg" : 3.9871794871794872
        },
        "replication" : {
          "min" : 1.0,
          "max" : 2.0,
          "avg" : 1.0897435897435896
        }
      }
    },
    "docs" : {
      "count" : 17805214,
      "deleted" : 1
    },
    "store" : {
      "size" : "10.8gb",
      "size_in_bytes" : 11616832667,
      "throttle_time" : "0s",
      "throttle_time_in_millis" : 0
    },
    "fielddata" : {
      "memory_size" : "0b",
      "memory_size_in_bytes" : 0,
      "evictions" : 0
    },
    "query_cache" : {
      "memory_size" : "206.8kb",
      "memory_size_in_bytes" : 211788,
      "total_count" : 97,
      "hit_count" : 1,
      "miss_count" : 96,
      "cache_size" : 0,
      "cache_count" : 7,
      "evictions" : 7
    },
    "completion" : {
      "size" : "0b",
      "size_in_bytes" : 0
    },
    "segments" : {
      "count" : 2759,
     "memory" : "51.1mb",
      "memory_in_bytes" : 53675552,
      "terms_memory" : "37.2mb",
      "terms_memory_in_bytes" : 39075148,
      "stored_fields_memory" : "4.1mb",
      "stored_fields_memory_in_bytes" : 4362016,
      "term_vectors_memory" : "0b",
      "term_vectors_memory_in_bytes" : 0,
      "norms_memory" : "1.5mb",
      "norms_memory_in_bytes" : 1586496,
      "points_memory" : "3.5mb",
      "points_memory_in_bytes" : 3763936,
      "doc_values_memory" : "4.6mb",
      "doc_values_memory_in_bytes" : 4887956,
      "index_writer_memory" : "11.6mb",
      "index_writer_memory_in_bytes" : 12228124,
      "version_map_memory" : "6.2kb",
      "version_map_memory_in_bytes" : 6355,
      "fixed_bit_set" : "0b",
      "fixed_bit_set_memory_in_bytes" : 0,
      "max_unsafe_auto_id_timestamp" : 1485976972919,
      "file_sizes" : { }
    }
  },
  "nodes" : {
    "count" : {
      "total" : 3,
      "data" : 3,
      "coordinating_only" : 0,
      "master" : 3,
     "ingest" : 3
    },
    "versions" : [
      "5.1.1"
    ],
    "os" : {
      "available_processors" : 12,
      "allocated_processors" : 12,
      "names" : [
        {
          "name" : "Linux",
          "count" : 3
        }
      ],
      "mem" : {
        "total" : "188.4gb",
        "total_in_bytes" : 202374275072,
        "free" : "20.8gb",
        "free_in_bytes" : 22421368832,
        "used" : "167.5gb",
        "used_in_bytes" : 179952906240,
        "free_percent" : 11,
        "used_percent" : 89
      }
    },
    "process" : {
      cpu" : {
        "percent" : 28
      },
      "open_file_descriptors" : {
        "min" : 769,
        "max" : 786,
        "avg" : 778
     }
    },
    "jvm" : {
      "max_uptime" : "23.1h",
      "max_uptime_in_millis" : 83268151,
      "versions" : [
        {
          "version" : "1.8.0_121",
          "vm_name" : "Java HotSpot(TM) 64-Bit Server VM",
          "vm_version" : "25.121-b13",
          "vm_vendor" : "Oracle Corporation",
          "count" : 3
        }
      ],
      "mem" : {
        "heap_used" : "23.8gb",
        "heap_used_in_bytes" : 25616487992,
        "heap_max" : "47.9gb",
        "heap_max_in_bytes" : 51435012096
      },
      "threads" : 209
    },
    "fs" : {
      "total" : "2.2tb",
      "total_in_bytes" : 2455203201024,
      "free" : "2.1tb",
      free_in_bytes" : 2411214864384,
      "available" : "2tb",
      "available_in_bytes" : 2286426980352,
      "spins" : "true"
    },
    "plugins" : [
      {
        "name" : "search-guard-5",
        "version" : "5.1.1-9",
        "description" : "Provide access control related features for Elasticsearch 5",
        "classname" : "com.floragunn.searchguard.SearchGuardPlugin"
      }
    ],
    "network_types" : {
      "transport_types" : {
        "com.floragunn.searchguard.ssl.http.netty.SearchGuardSSLNettyTransport" : 3
      },
      "http_types" : {
        "netty4" : 3
      }
    }
  }
}

All my elasticsearch node have config and I have given heap_size: 16g to each of them.

cluster.name: myelasticsearch-cluster
network.host: elasticsearchip1
node.name: ${HOSTNAME}
discovery.zen.ping.unicast.hosts: ["elasticsearchip1","elasticsearchip2","elasticsearchip3"]
searchguard.ssl.transport.enabled: true
searchguard.ssl.transport.keystore_filepath: node-keystore.jks
searchguard.ssl.transport.keystore_password: keystorepassword
searchguard.ssl.transport.truststore_filepath: truststore.jks
searchguard.ssl.transport.truststore_password: truststorepassword
searchguard.ssl.transport.enforce_hostname_verification: false
searchguard.authcz.admin_dn:
  - CN=sgadmin

Thanks.

What indexing rate are you seeing? What type of hardware (CPU cores, type of storage, RAM) is Elasticsearch deployed on?

Thanks for the quick reply @Christian_Dahlqvist.

I'm seeing, an index count of [quote="jstar, post:8, topic:72096"]
"indices" : {
"count" : 78,
[/quote].

The number of cores that elasticsearch is using is 3., its RAM 16g, and I'm working on a HDD drive.

Given the size of your data set I would say you have far too many indices and shards. I would recommend revisiting your indexing and sharing strategy. How many of these indices are you actively indexing into?

As you are complaining about performance, how many documents are indexing into the cluster per seconds (the indexing rate)?

What type of data are you indexing? What is the average document size?

@Christian_Dahlqvist thanks for the reply.
My indexing rate is ~ 4959 per secs. I'm indexing logs a typically example of such document is

{
  "_index": "myindexLog",
  "_type": "applogs",
  "_id": "AVn9dBqSuCQnz8OHTnYX",
  "_score": null,
  "_source": {
    "@timestamp": "2017-02-02T06:12:34.588Z",
    "offset": 8705,
    "beat": {
      "hostname": "london208",
      "name": "london208",
      "version": "5.1.1"
    },
    "input_type": "log",
    "@version": "1",
    "source": "myindexLog",
    "message": "  myindexLog 2017-02-02 01:12:30 INFO  TaskSetManager:54 - Finished task 3.0 in stage 20086.0 (TID 1352263) in 15 ms on 192.168.0.201 (executor 0) (4/200)",
    "type": "applogs"
  },
  "fields": {
    "@timestamp": [
      1486015954588
    ]
  },
  "sort": [
    1486015954588
  ]
}

The average size of a document is ~ 372.61 bytes.

Now regarding the sharding and indexing policy, Please can tell me how I can reduce my sharding and indexing policy. Because my indexes are automatically created depending on the field source of the event.

And by so doing, they are created by default with a shard of 5 and replica of 5. I tried using index.number_of_shards and index.number_of_replicas in elasticsearch.yml but this does not works with elasticsearch 5.x. As per this link ES 5 wont start with config setting index.number_of_shards · Issue #18073 · elastic/elasticsearch · GitHub, this does not for elasticsearch 5.x.
But with

curl -u user:password -XPUT 'elasticsearchip1:9200/_all/_settings?preserve_existing=true' -d '{
   "index.number_of_shards" : "1"
}'

I get a

{"error":{"root_cause":[{"type":"remote_transport_exception","reason":"[london205][elasticsearchip2:9300][indices:admin/settings/update]"}],"type":"illegal_argument_exception","reason":"can't change the number of shards for an index"},"status":400}

Any clue please?

You can control mappings as well as the number of shards through index templates.

Indexing in Elasticsearch is CPU and disk I/O intensive. You only have 3 cores per node and spinning disks, which will limit your indexing throughput. You are however most likely not able to utilise this hardware efficiently as you are indexing into a potentially large number of shards for each bulk request. This will result in a large number of small writers across the disk, which is not the best way to get the most from a spinning disk.

If you were able to store all data from all sources in a single index with a small number of shards, I suspect you might see better throughput. If this still too slow or not an option, you may need to upgrade your hardware.

You are correct. This number 17881576 is a total of all thread durations. To get a more usable number divide it by the number of worker threads that were running at the time.

@guyboertje Thanks for the reply,
I having only 1 workers threads. Thus there is not change on this value.

Are you explicitly setting the workers to 1?

No I just rely on the default setting of logstash 5.x.

That means your workers = the number of cores on the machine or VM.

How many cores does your machine or VM have?

My machine have 3 cores then I guess the worker threads should 3 which gives 17881576/3 =~ 5960525ms~99.34208333minutes. Which is still much.

As the vast majority of time is spent in the output plugin, I don't think Logstash is the bottleneck. As I pointed out earlier, I think you should look at your Elasticsearch cluster.