FIlebeat-Redis-Logstash : Filebeat fast and Logstah slow, logstash threading?

Hi all,

I'm facing a latency issue with logstash.

In fact I have an ELK stack built like this :

  • I have several web front on AWS EC2 in an AWS autoscaling group
  • I have filebeat installed on each front
  • filebeat reads logs files and sends messages to a redis cluter (aws elasticache redis, a master and a slave node, cluster mode disabled)
  • I have logstash installed on an EC2 c4.large, which read logs from redis (pop), and indexes them in an Elasticseach cluster
  • My elasticsearch consits of three EC2 c4.xlarge
  • logstash also reads elb logs from s3 and indexes them in the elasticsearch cluster

The problem :

  • I don't really have a big cpu use, either on my logstash instance or my elasticsearch cluster
  • filebeat is reading and sending logs accurately
  • At the begining all things were working fine, but as the logs grow, filebeat contunues sending logs very fast, but logstash becomes very slow.

Result :

  • In kibana I see filebeat logs with a delay growing up with time (logs are now more than 2 hours late)
  • I'm not seeing s3 elb logs since december 2016. I've checked, logstash is pulling them from s3 each 60 secondes but seems to not indexing them as well, and there is no error.

To resume, I have my logstash working verry slowly, I can't see all my logs on time, I've even increase logstash size to a more big instance (c4.2xlarge) but it did not change anything.
I've configured logstash redis input with 8 threads but no change at all.

So I would like to know how I can accurately thread my logstash service, How I can deal with issue from your point of view?

Thanks

What does your Logstash configuration look like? What is the indexing rate you are seeing in Elasticsearch? What kind of storage do you have for your Elasticsearch nodes?

Hi,

Sorry for the delay; below my architecture and configurations :

  • As you can see above, the logstash is installed in the same ec2 inistance as kibana and an elasticsearch client.
  • the legend of signs in brackets : r means reading, w means writing, i means indexing
  • My logstash configuration :
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# !!!!!!!!!  This file is managed by SALT  !!!!!!!!!
# !!!!!!!!!    All changes will be lost    !!!!!!!!!
# !!!!!!!!!     DO NOT EDIT MANUALLY !     !!!!!!!!!
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

#--[ INPUT ]----------------------------------------------------------------
input
{
 # Logs ELB API
  s3
  {
    bucket => "s3.prod.elb.logs.eu-west-1.mydomain"
    prefix => "rtb/smaato/AWSLogs/653589711111/elasticloadbalancing/"
    interval => 60
    region => "eu-west-1"
    type => "elb_access_log"
  }

  redis
  {
    data_type => "list"
    #batch_count => 100
    threads => 8
    key => "filebeat"
    host => "redis.prod.eu-west-1.mydomain.com"
  }
}



#--[ FILTER ]---------------------------------------------------------------
filter
{

  # Handle PHP errors
  if [type] == "php_error_log" {
      grok {
        match => [ "message", "\[%{MONTHDAY:day}-%{MONTH:month}-%{YEAR:year} %{TIME:time} %{WORD:zone}/%{WORD:country}\] PHP %{DATA:errorType}\:  %{GREEDYDATA:errorMessage}" ]
        add_field    => { "timestamp" => "%{day}-%{month}-%{year} %{time} %{zone}/%{country}" }
        remove_field => [ "day", "month", "year", "time", "zone", "country" ]
      }
      multiline {
        pattern => "(Stack trace:)|(^#.+)|(^\"\")|(  thrown+)|(^\s)"
        negate => true
        what => "previous"
      }
      date {
        match        => [ "timestamp" , "yyyy-MM-dd HH:mm:ss", "dd-MMM-yyyy HH:mm:ss ZZZ" ]
        target       => "@timestamp"
        remove_field => "timestamp"
      }
  }

  if [type] == "appli_mydomain_log" {
    # Type the RtbApi logs
    if [source] =~ "rtbapi" {
      mutate {
        replace => { "type" => "rtbapi_log" }
      }
    }
    if [source] =~ "rtbapi_error" {
      mutate {
        replace => { "type" => "rtbapi_error_log" }
      }
    }

    # Type the RtbDeploy logs
    if [source] =~ "rtbdeploy" {
      mutate {
        replace => { "type" => "rtbdeploy_log" }
      }
    }
    if [source] =~ "rtbdeploy_error" {
      mutate {
        replace => { "type" => "rtbdeploy_error_log" }
      }
    }

    # Type the Rtbo logs
    if [source] =~ "rtbo" {
      mutate {
        replace => { "type" => "rtbo_log" }
      }
    }
    if [source] =~ "rtbo_error" {
      mutate {
        replace => { "type" => "rtbo_error_log" }
      }
    }

    # Type the RtbReader logs
    if [source] =~ "rtbreader" {
      mutate {
        replace => { "type" => "rtbreader_log" }
      }
    }
    if [source] =~ "rtbreader_error" {
      mutate {
        replace => { "type" => "rtbreader_error_log" }
      }
    }

    # Type the RtbWriter logs
    if [source] =~ "rtbwriter" {
      mutate {
        replace => { "type" => "rtbwriter_log" }
      }
    }
    if [source] =~ "rtbwriter_error" {
      mutate {
        replace => { "type" => "rtbwriter_error_log" }
      }
    }

    # Parse the JSON data
    json {
      source => "message"
      remove_field => [ "message" ]
    }

    # Set the HTTP request time to @timestamp field
    date {
      match => [ "timestamp", "ISO8601" ]
      remove_field => [ "timestamp" ]
    }

  }

  # Parse the CloudFront access logs
  #if [type] == "cloudfront_access_log" {
  #  grok {
  #    match => { "message"=> "%{DATE_EU:date}\t%{TIME:time}\t%{WORD:x-edge-location}\t(?:%{NUMBER:sc-bytes}|-)\t%{IPORHOST:c-ip}\t%{WORD:cs-method}\t%{HOSTNAME:cs-host}\t%{NOTSPACE:cs-uri-stem}\t%{NUMBER:sc-status}\t%{GREEDYDATA:referrer}\t%{GREEDYDATA:User-Agent}\t%{GREEDYDATA:cs-uri-stem}\t%{GREEDYDATA:cookies}\t%{WORD:x-edge-result-type}\t%{NOTSPACE:x-edge-request-id}\t%{HOSTNAME:x-host-header}\t%{URIPROTO:cs-protocol}\t%{INT:cs-bytes}" }
   # }

   # mutate {
   #   add_field => [ "listener_timestamp", "%{date} %{time}" ]
   # }

   # date {
   #   match => [ "listener_timestamp", "yy-MM-dd HH:mm:ss" ]
   # }
 # }

  # Parse the ELB access logs
  if [type] == "elb_access_log" {
    grok {
      match => [ "message", "%{TIMESTAMP_ISO8601:timestamp:date} %{HOSTNAME:loadbalancer} %{IP:client_ip}:%{POSINT:client_port:int} (?:%{IP:backend_ip}:%{POSINT:backend_port:int}|-) %{NUMBER:request_processing_time:float} %{NUMBER:backend_processing_time:float} %{NUMBER:response_processing_time:float} %{INT:backend_status_code:int} %{INT:received_bytes:int} %{INT:sent_bytes:int} %{INT:sent_bytes_ack:int} \"%{WORD:http_method} %{URI:url_asked} HTTP/%{NUMBER:http_version}\" \"%{GREEDYDATA:user_agent}\" %{NOTSPACE:ssl_cipher} %{NOTSPACE:ssl_protocol}" ]
      remove_field => [ "message" ]
    }

    kv {
      field_split => "&?"
      source => "url_asked"
    }

    date {
      match => [ "timestamp", "ISO8601" ]
      remove_field => [ "timestamp" ]
    }
  }

  # Remove the filebeat input tag
  mutate {
    remove_tag => [ "beats_input_codec_plain_applied" ]
  }

  # Remove field tags if empty
  if [tags] == [] {
    mutate {
      remove_field => [ "tags" ]
    }
  }

  # Remove some unnecessary fields to make Kibana cleaner
  mutate {
    remove_field => [ "@version", "count", "fields", "input_type", "offset", "[beat][hostname]", "[beat][name]", "[beat]" ]
  }

}

#--[ OUTPUT ]---------------------------------------------------------------
output
{
  elasticsearch {
    hosts => ["10.3.16.80:9200"]
  }
}
  • each elasticsearch cluster node has a separated data and log partition with gp2 storage type

Thanks for your advices

Which versions of Logstash and Elasticsearch are you using?

logstash : 2.2.4

elasticsearch : 2.3.4

Thanks.

This is a good blog post that describes how output and filter workers have evolved in Logstash. As you are on Logstash 2.2, I would recommend setting the Elasticsearch output workers parameter to the same value as the number of filter workers you are using (think this defaults to the number of cores on the host in Logstash 2.2.x). At the moment you probably only have a single connection to Elasticsearch, which will limit your throughout. You may also want to increase the redis batch size to the value you have commented out.

Thank Christian for the link.

But concerning the batch count, I've read that the default 125 set by logstash is a good value, and that increasing it could affect logstash performance.

Do you agree that? Which value should put there? The logstash instance is a c4.large (2 vcpu and 3,75GB of RAM)

Regards

Looks like spell checker made a change. I was referring to the redis batch size. Also note that the micro batch size of 125 does not apply to the version of Logstash that you are using.

There are some good performance tuning guides for the more recent versions of Logstash, so it may be worthwhile upgrading as even the latest version of Logstash, which is compatible with the Elasticsearch version you are using.

Ok Christian.

So If I understand well, I should first upgrade logstash to logstash 2.3.4 (not 5.X), and then follow the advices you just gave to me. Right?

Regards.

You should be able to upgrade to Logstash 5.1.1 as this works with your Elasticsearch version. If you prefer to stick with the version you are on, start by increasing the number of workers in the elasticsearch output plugin to 2 and see if that makes any difference. Note that c4.large instances have reasonably low network performance, so it may be worthwhile upgrading this one step in order to also get more CPU, especially if you are also running a client node on the same host.

Setting the workers to 2 and restart lead me to this in logstash log file :

{:timestamp=>"2017-01-11T12:14:52.728000+0100", :message=>"Defaulting pipeline worker threads to 1 because there are some filters that might not work with multiple worker threads", :count_was=>2, :filters=>["multiline"], :level=>:warn}

What does it mean concretely, what should I do please?

Regards,

Don't use the multiline filter as this has been deprecated due to this limitation which limits performance. Instead use the multiline codec plugin with the input it applies to. It is generally recommended to perform the multiline processing as close to the source as possible, which is why e.g. Filebeat now has support for multiline processing.

Ok Christian,

I'm trying all that options.

But please I would like to understand something.. Is it really necessary to increase the logstash instance in terms of CPU while there is no cpu high on the serveur, The machine steals at 40% of CPU.

How can we explain the fact the logstash service ios slow but there is no cpu high?

Actually the delay for my api log is decreasing (less than two hours)., but I still don't have any s3 log output.
I can see by inspecting network packet that logstash is pulling logs from s3, but not output in elasticsearch.

How can I troubleshoot this issue? What can be the matter from your point of view please. The latest logs I've got from s3 is on December 17 2016.

Thank for your help.

Regards.

Logstash is often limited by CPU, so if that is not nearly fully utilised there is no need to scale up yet. The fact that you are using a single processing thread due to the multiline filter plugin and also a single elasticsearch connection will reduce throughput. If you have not processed any logs from S3 in such a long time there might be something wrong with that input.

I do however not have any experience with the S3 input plugin, so am not sure what the most efficient way to troubleshoot this.

Thank you Christian.

I'm tunning my settings.

By looking where to set my pipeline settings, I notices that the logstash init file has an environment variable LS_HOME (I didn't see the LOGSTASH_HOME environment variable) with the value : /var/lib/logstash.
This is the content of this directory :

admin@ec2-prod-log-rtb-elk-10-3-16-91:~$ ls -lR /var/lib/logstash
/var/lib/logstash:
total 4
drwxr-xr-x 2 logstash logstash 4096 Jan 11 18:18 logstash

/var/lib/logstash/logstash:
total 21484
-rw-r--r-- 1 logstash logstash 21997525 Jan 11 18:18 653589716289_elasticloadbalancing_eu-west-1_prod-mydomain_20161210T1705Z_52.208.51.100_4qnbeumz.log
admin@ec2-prod-log-rtb-elk-10-3-16-91:~$ 

As you can see there is a folder logstash under the LS_HOME folder, and this folder contains a log file that I checked and logs messages of today.

This confirms that logstash is pulling logs from s3. But please can one explain the purpose of this folder and file. Why the file is on the disk and is not indexing?

Thanks.

Regards.

Hi,

I've remove the multiline from filter and put it at the filebeat source side. And now i don't have the error

{:timestamp=>"2017-01-11T12:14:52.728000+0100", :message=>"Defaulting pipeline worker threads to 1 because there are some filters that might not work with multiple worker threads", :count_was=>2, :filters=>["multiline"], :level=>:warn}

I have added this file :

admin@ec2-prod-log-rtb-elk-10-3-16-91:~$ cat /var/lib/logstash/config/logstash.yml
pipeline:
batch:
size: 125
delay: 5
workers: 4
batch.size: 250
config.debug

My logs has again two hour of delay in kibana, but I can see that my logstash instance has now a bigger cpu usage, arroung 65% questions are :
:

  • Did I put the logstash.yml file in the right path? In the logstash documentation it's just said LOGSTASH_HOME/config, and I used LS_HOME/config cause I didn't see the LOGSTASH_HOME environement variable
  • How can I check the pipeline configuration, the configtest option doesn't check this file (or check another path)
  • Is there a way to check and debug each part of logstash pipeline? I mean see : input processing, filter processing, output processing.

Thank you for your attention.

Regards,

My configuration file looks like this :

# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# !!!!!!!!!  This file is managed by SALT  !!!!!!!!!
# !!!!!!!!!    All changes will be lost    !!!!!!!!!
# !!!!!!!!!     DO NOT EDIT MANUALLY !     !!!!!!!!!
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

#--[ INPUT ]----------------------------------------------------------------
input
{
 # Logs ELB API
  s3
  {
    bucket => "s3.prod.elb.logs.eu-west-1.mydomain"
    prefix => "rtb/smaato/AWSLogs/653589711111/elasticloadbalancing/"
    interval => 60
    region => "eu-west-1"
    type => "elb_access_log"
  }

  redis
  {
    data_type => "list"
    #batch_count => 100
    threads => 8
    key => "filebeat"
    host => "redis.prod.eu-west-1.mydomain.com"
  }
}



#--[ FILTER ]---------------------------------------------------------------
filter
{

  # Handle PHP errors
  if [type] == "php_error_log" {
      grok {
        match => [ "message", "\[%{MONTHDAY:day}-%{MONTH:month}-%{YEAR:year} %{TIME:time} %{WORD:zone}/%{WORD:country}\] PHP %{DATA:errorType}\:  %{GREEDYDATA:errorMessage}" ]
        add_field    => { "timestamp" => "%{day}-%{month}-%{year} %{time} %{zone}/%{country}" }
        remove_field => [ "day", "month", "year", "time", "zone", "country" ]
      }
      date {
        match        => [ "timestamp" , "yyyy-MM-dd HH:mm:ss", "dd-MMM-yyyy HH:mm:ss ZZZ" ]
        target       => "@timestamp"
        remove_field => "timestamp"
      }
  }

  if [type] == "appli_mydomain_log" {
    # Type the RtbApi logs
    if [source] =~ "rtbapi" {
      mutate {
        replace => { "type" => "rtbapi_log" }
      }
    }
    if [source] =~ "rtbapi_error" {
      mutate {
        replace => { "type" => "rtbapi_error_log" }
      }
    }
    # Type the RtbDeploy logs
    if [source] =~ "rtbdeploy" {
      mutate {
        replace => { "type" => "rtbdeploy_log" }
      }
    }
    if [source] =~ "rtbdeploy_error" {
      mutate {
        replace => { "type" => "rtbdeploy_error_log" }
      }
    }
    # Type the Rtbo logs
    if [source] =~ "rtbo" {
      mutate {
        replace => { "type" => "rtbo_log" }
      }
    }
    if [source] =~ "rtbo_error" {
      mutate {
        replace => { "type" => "rtbo_error_log" }
      }
    }
    # Type the RtbReader logs
    if [source] =~ "rtbreader" {
      mutate {
        replace => { "type" => "rtbreader_log" }
      }
    }
    if [source] =~ "rtbreader_error" {
      mutate {
        replace => { "type" => "rtbreader_error_log" }
      }
    }
    # Type the RtbWriter logs
    if [source] =~ "rtbwriter" {
      mutate {
        replace => { "type" => "rtbwriter_log" }
      }
    }
    if [source] =~ "rtbwriter_error" {
      mutate {
        replace => { "type" => "rtbwriter_error_log" }
      }
    }
    # Parse the JSON data
    json {
      source => "message"
      remove_field => [ "message" ]
    }
    # Set the HTTP request time to @timestamp field
    date {
      match => [ "timestamp", "ISO8601" ]
      remove_field => [ "timestamp" ]
    }

  }

  # Parse the CloudFront access logs
  #if [type] == "cloudfront_access_log" {
  #  grok {
  #    match => { "message"=> "%{DATE_EU:date}\t%{TIME:time}\t%{WORD:x-edge-location}\t(?:%{NUMBER:sc-bytes}|-)\t%{IPORHOST:c-ip}\t%{WORD:cs-method}\t%{HOSTNAME:cs-host}\t%{NOTSPACE:cs-uri-stem}\t%{NUMBER:sc-status}\t%{GREEDYDATA:referrer}\t%{GREEDYDATA:User-Agent}\t%{GREEDYDATA:cs-uri-stem}\t%{GREEDYDATA:cookies}\t%{WORD:x-edge-result-type}\t%{NOTSPACE:x-edge-request-id}\t%{HOSTNAME:x-host-header}\t%{URIPROTO:cs-protocol}\t%{INT:cs-bytes}" }
   # }

   # mutate {
   #   add_field => [ "listener_timestamp", "%{date} %{time}" ]
   # }

   # date {
   #   match => [ "listener_timestamp", "yy-MM-dd HH:mm:ss" ]
   # }
 # }

  # Parse the ELB access logs
  if [type] == "elb_access_log" {
    grok {
      match => [ "message", "%{TIMESTAMP_ISO8601:timestamp:date} %{HOSTNAME:loadbalancer} %{IP:client_ip}:%{POSINT:client_port:int} (?:%{IP:backend_ip}:%{POSINT:backend_port:int}|-) %{NUMBER:request_processing_time:float} %{NUMBER:backend_processing_time:float} %{NUMBER:response_processing_time:float} %{INT:backend_status_code:int} %{INT:received_bytes:int} %{INT:sent_bytes:int} %{INT:sent_bytes_ack:int} \"%{WORD:http_method} %{URI:url_asked} HTTP/%{NUMBER:http_version}\" \"%{GREEDYDATA:user_agent}\" %{NOTSPACE:ssl_cipher} %{NOTSPACE:ssl_protocol}" ]
      remove_field => [ "message" ]
    }
    kv {
      field_split => "&?"
      source => "url_asked"
    }
    date {
      match => [ "timestamp", "ISO8601" ]
      remove_field => [ "timestamp" ]
    }
  }

  # Remove the filebeat input tag
  mutate {
    remove_tag => [ "beats_input_codec_plain_applied" ]
  }

  # Remove field tags if empty
  if [tags] == [] {
    mutate {
      remove_field => [ "tags" ]
    }
  }

  # Remove some unnecessary fields to make Kibana cleaner
  mutate {
    remove_field => [ "@version", "count", "fields", "input_type", "offset", "[beat][hostname]", "[beat][name]", "[beat]" ]
  }

}

 #--[ OUTPUT ]---------------------------------------------------------------
 output
 {
   elasticsearch {
     hosts => ["10.3.16.80:9200"]
     workers => 4

   }
}

Regards.

I got the answer about the logstash.yml file for pipeline settings

It's matter of logstash version. For the version 2.2.4 I'm using this option is passed as value to the LS_OPTS variable in the /etc/default/logstash file

the documention is not accurate for version 2.2.4

Hi,

I've found the issue for my logs latency.
I was using multine in my filter configuration, and with that, logstash set pipeline workers count to 1.
So I remove it there and make multiline processing in my filebeat source configuration.
I also upgrade my logstash instance from c4.large to c4.xlarge in order to have a hight network performance, and I set my pipeline worker count to 6 (c4.xlarge has 4 vpcu), and my elasticsearch worker to 6 too.

Now I can have my logs in kibana in real time.

I'm using logstash 2.2.4 on Debian Jessie, and made my settings tuning in /etc/default/logstash.,

So what I should advise here is avoiding make multiline in the pipeline process and do it as close to the corresponding as possible.
Also set workers configuration appropiately to handle logs growing.

I consider this issue solved as my main issue here was the latency, and I will open anoter separate issue for the logstash s3 plugin issue.

Thank to @Christian_Dahlqvist for his precious help and advices

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.