Logstash 2.2.3 randomly stops forwarding logs to Elastic Search Node

Hi All,

I have set up an cluster with 1 logstash collector and 1 elastic search node. I have my logstash listening on TCP/UDP syslog port 514, and forwarding all logs to my elastic-search node. Currently, my elastic-search node is indexing around 300k events an hour.

I am running into an issue where after 1-3 hours my logstash box stops forwarding logs to elastic search. No error message is generated, and debug output shows nothing helpful. Has anyone run into this issue before? Below are the stats for both boxes and my configs.

Logstash Collector
-8GB of RAM
-2 core
-virtualized (Red Hat 4.8.5-4)

Elastic Search Indexing Node
-28 GB RAM
-2 cores
-virtualized (Ubuntu 14.04)

00-syslog-input.conf

 input {
  tcp {
    port => 514
    type => syslog
  }
  udp {
    port => 514
    type => syslog
  }
}

101-windows-em3-events-filter.conf

filter {
   if ("eventmonitor-Microsoft" in [message]){
      grok {
         match => { "message" => "%{DATA:name} %{GREEDYDATA:extension}" }
      }
      csv {
          columns => ["column1", "column2", "column3", "column4", "column5", "column6", "column7", "column8", "column9", "column10", "column11", "column12", "column13", "column14", "sntdom", "column16", "column17", "column18"]
          source => "extension"
          separator => "|"
          remove_field => ["column12", "column13", "column14", "column16"]
      }
      ruby {
           code => "
           hash = event.to_hash
           hash.each do |k,v|
              if v == nil || v == '-' || v == '0'
                 event.remove(k)
              end
              if k.include? 'column'
                 columnNum = k.sub('column','')
                 if columnNum.to_i > 18
                    event.remove(k)
                 end
                 if columnNum.to_i == 18
                    if v.to_s.match(/\A[+-]?\d+?(\.\d+)?\Z/) == nil ? false : true
                       event.remove(k)
                    end
                    if v != nil
                       if v.include? '.'
                          event.remove(k)
                       end
                       if v == 'Security'
                           event.remove(k)
                       end
                    else
                         event.remove(k)
                    end
                 end
              end
           end
           "
      }
      mutate{
          split => {"column1" => ":"}
          add_field => ["src", "%{[column1[-1]]}"]
          remove_field => ["column1"]

          remove_field => ["column2"] #We do not need both the hostname and the fqdn

          split => {"column3" => ":"}
          add_field => ["shost", "%{[column3[-1]]}"]
          remove_field => ["column3"]

          remove_field => ["column4"] #We are already capturing time sent

          remove_field => ["column5"] #Time written is redundant

          split => {"column6" => ":"}
          add_field => ["recordNum", "%{[column6[-1]]}"]
          remove_field => ["column6"]

          split => {"column7" => ":"}
          add_field => ["logType", "%{[column7[-1]]}"]
          remove_field => ["column7"]

          split => {"column8" => ":"}
          add_field => ["logSource", "%{[column8[-1]]}"]
          remove_field => ["column8"]

          split => {"column9" => ":"}
          add_field => ["eventid", "%{[column9[-1]]}"]
          remove_field => ["column9"]

          split => {"column10" => ":"}
          add_field => ["act", "%{[column10[-1]]}"]
          remove_field => ["column10"]

          split => {"column11" => ":"}
          add_field => ["cat", "%{[column11[-1]]}"]
          remove_field => ["column11"]


          split => {"column17" => ":"}
          add_field => ["object", "%{[column17[-1]]}"]
          remove_field => ["column17"]

          add_field => ["suser", "%{[column18]}"]
          remove_field => ["column18"]

          remove_field => ["extension"]
          remove_field => ["message"]

      }
      if ("%" in [suser]){
         mutate{
            remove_field => ["suser"]
         }
      }

      if ("%" in [object]){
         mutate{
            remove_field => ["object"]
         }
      }

   }
}

101-windows-em3-events-output

output {
   if ('eventmonitor-Microsoft-Windows' in [name]){
       elasticsearch {
          hosts => ["167.236.26.11:9200"]
          index => "em3-%{+YYYY.MM.dd}"
          flush_size => 5000
          workers => 10
       }
        #stdout { codec => rubydebug }
   }
}

Can you upgrade LS to 2.3.2?

I upgraded yesterday to 2.3.2, I am still experiencing the same problem.

Hi,
to be sure to detect the right source of your problem would be nice to have a few log lines, might be something inducted by your filters, we should test. can you provide that?