Logstash loosing data during processing

Hi everyone,

I'm experiencing strange behavior of logstash when processing data.

My setup is central rsyslog which receive all logs from all servers, this rsyslog sends logs to logstash server.

Logstash server setup:

rsyslog -> redis (json) -> logstash -> elasticsearch

I'm using

  • rsyslog 8.21.0
  • redis 2.8.17
  • logstash 2.4.0
  • OS Debian Wheezy
  • Kernel 4.4.2
  • 2 node ES cluster 2.3.3

Test command:

for i in seq 1 30 ; do logger -t logstashtest "message log-$i" ; sleep 0.1 ; echo -n "$i." ;done

Then I realized that all messages successfully arrived to redis database but only 3-5 of them finaly gets to Elasticsearch database.

Rsyslog config

module(load="omhiredis")
template(name="ls_json" type="list" option.json="on")
   { constant(value="{")
     constant(value="\"timestamp\":\"")         property(name="timegenerated" dateFormat="rfc3339")
     constant(value="\",\"message\":\"")         property(name="msg")
     constant(value="\",\"host\":\"")            property(name="fromhost")
     constant(value="\",\"host_ip\":\"")         property(name="fromhost-ip")
     constant(value="\",\"logsource\":\"")       property(name="fromhost")
     constant(value="\",\"severity_label\":\"")  property(name="syslogseverity-text")
     constant(value="\",\"severity\":\"")        property(name="syslogseverity")
     constant(value="\",\"facility_label\":\"")  property(name="syslogfacility-text")
     constant(value="\",\"facility\":\"")        property(name="syslogfacility")
     constant(value="\",\"program\":\"")         property(name="programname")
     constant(value="\",\"pid\":\"")             property(name="procid")
     constant(value="\",\"syslogtag\":\"")       property(name="syslogtag")
     constant(value="\"}\n")
   }
*.* action(
  name="push_redis"
  type="omhiredis"
  server="127.0.0.1"
  mode="queue"
  key="syslog"
  template="ls_json"
)

Logstash config

input {
  
    # Redis Qeueue
    redis {
        host => '127.0.0.1'
        data_type => 'list'
        key => 'syslog'
        codec => json { charset => "ISO-8859-1" }
        type => 'syslog'
        threads => 2
    }
}

filter { 
if [type] == 'syslog' {
    #
    # SYSLOG
    #

        # NodeJS logs in JSON format, parse
        if [program] == 'node' {
            json {
                source => "message"
            }
        }

        # Cron messages
        else if [program] == 'cron' {
            grok { match => { "message" => "%{CRONLOG}" } }
        }

        # Query counter log
        else if [program] == 'query_counter.log' {
            grok {
                match => {"message" => "%{WORD:qc_type}: %{DATA:qc_script}%{SPACE}\|%{SPACE}%{INT:qc_master_select:int}%{SPACE}%{INT:qc_master_insert:int}%{SPACE}%{INT:qc_master_update:int}%{SPACE}%{INT:qc_master_delete:int}%{SPACE}%{INT:qc_master_replace:int}%{SPACE}\|%{SPACE}%{INT:qc_slave_select:int}%{SPACE}%{INT:qc_slave_insert:int}%{SPACE}%{INT:qc_slave_update:int}%{SPACE}%{INT:qc_slave_delete:int}%{SPACE}%{INT:qc_slave_replace:int}%{SPACE}\|%{SPACE}%{INT:qc_archive_select:int}%{SPACE}%{INT:qc_archive_insert:int}%{SPACE}%{INT:qc_archive_update:int}%{SPACE}%{INT:qc_archive_delete:int}%{SPACE}%{INT:qc_archive_replace:int}"}
            }
        }

        # Apache Logs
        else if [program] == 'httpd_access' {
            grok {
                match => { "message" => "%{IPORHOST:clientip} %{HTTPDUSER:ident}( %{USER:auth})? \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:method} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response:int} (?:%{NUMBER:bytes:int}|-) %{QS:referrer} %{QS:agent} %{GREEDYDATA:server_name} %{NUMBER:time_s:int}/%{NUMBER:time_us:int}" }
            }
        }

       # Varnishincsa
        else if [program] == 'varnishncsa' {
            grok {
                match => { "message" => "%{IPORHOST:clientip}[ ]+%{HTTPDUSER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:method} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response:int} (?:%{NUMBER:bytes:int}|-) %{QS:referrer} %{QS:agent} (%{NUMBER:duration:float}|nan) %{GREEDYDATA:varnish_status}( %{NUMBER:varnish_miss_num:int})?" }
            }
        }

}
output {

    # SYSLOG
    if [type] == 'syslog' {
        elasticsearch {
            hosts => [ "lses1.edrive.intra", "lses2.ls.intra" ]
            index => "syslog-%{+YYYY.MM.dd}"
            template => "/etc/logstash/elastic-syslog-template.json"
            template_overwrite => true
        }

    }


}

I tried commented out all sections in filter{} which do any drop. Result is the same.

Before I used rsyslog-redis I have input{} configured with module syslog directly. I was thinking that loosing messages was result of overload of logstash input syslog module. So I tried add rsyslog with redis.

So even if I use this logstash input, still same problem

syslog {
        type => 'syslog'
        port => 5114
        codec => plain { charset => "ISO-8859-1" }
 }