Event loss on way rsyslog-> LS -> ES

Hi there,

I've thinking about a little validation of my ELK-Stack Installation... A Word and a blow!

The differenc between a sepaerate logfile and indexed events is very very high:

Logfile: 24Million lines and Index: 8Million Events...

Yes, I'm well aware that logging traffic is udp, mainly... And up to 5% packetloss will be fine for this case...

Please provide me to finding the bottleneck and a solution for this issue...

ES is configured with Max HEAP Size of: 24G
LS is configured with Max HEAP Size of: 8G (I've never seen more than exactly 1GB reserved)

The deployment is on one single VM with 2x2 Cores and 64GB RAM, OS RedHat.

...

Not sure what you mean find the bottleneck, you will not have packet loss based on the information your providing. Here is a brain dump on all the questions I have based on the question.

What do you mean difference between log files and index is very high?

What is your load average on your server, what is the average network volume on it and what is it configured for? With UDP you will not see Packet loss on the NIC interface as it is not a "Drop"

The metric filter can help you see how many messages Logstash is processing, Marvel can help you see Indexes per second

Logstash will only use the amount of Heap that has packets in flight. you probably want to reduce that to just 2GB

2 cores is probably two little as this is java and it is Highly threaded, but if your load it under 1 it might be ok.

Please provide your logstash configuration so we know how your configured.

Different amount of entires between Logfile and index.

Output of TOP with view of singel cpu:

    top - 21:52:30 up 3 days, 15:24,  1 user,  load average: 0.90, 1.50, 1.68
    Tasks: 200 total,   1 running, 199 sleeping,   0 stopped,   0 zombie
    Cpu0  :  9.4%us,  0.7%sy, 12.4%ni, 71.2%id,  6.4%wa,  0.0%hi,  0.0%si,  0.0%st
    Cpu1  : 13.3%us,  0.7%sy,  6.7%ni, 69.7%id,  9.7%wa,  0.0%hi,  0.0%si,  0.0%st
    Cpu2  : 25.5%us,  2.3%sy, 21.5%ni, 44.0%id,  4.7%wa,  0.0%hi,  2.0%si,  0.0%st
    Cpu3  : 12.0%us,  1.3%sy, 24.1%ni, 58.5%id,  4.0%wa,  0.0%hi,  0.0%si,  0.0%st
    Mem:  65974072k total, 62263312k used,  3710760k free,   396620k buffers
    Swap:  2097148k total,   111388k used,  1985760k free, 44561524k cached

      PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                                                                                       
    16254 logstash  39  19 11.5g 793m  14m S 65.8  1.2 839:36.21 java                                                                                                                                           
    22814 elastics  20   0 66.3g  12g 230m S 60.2 20.4   2728:03 java                                                                                                                                           
     1830 root      20   0  407m  11m 1404 S  5.0  0.0 243:37.51 rsyslogd      

On this server is ES, LS, Kibana, rsyslog and ngnix configured.

/etc/init.d/logstash

LS_HEAP_SIZE="8192m"

syslog_input

input {
   syslog {
      type => syslog
      port => 10514
   }
}

syslog_output

output {
  if [program] == "accesslog-elastic" {
    elasticsearch {
      hosts => [ "10.10.10.10:9200" ]
      index => "proxy-v2-%{+YYYY.MM.dd}"
    }
  }
  else if "ASA" in [program] {
    elasticsearch {
      hosts => [ "10.10.10.10:9200" ]
      index => "firewall-%{+YYYY.MM.dd}"
    }
  }
}

I'll let you know the output of metric filter, later

filter {
       if [type] == "syslog" {
          syslog_pri { }
          date {
             match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
          }
          mutate {
             replace => [ "host", "%{logsource}" ]

          }
          if [program] == "accesslog-elastic" {
             grok {
                patterns_dir => ["/etc/logstash/conf.d/patterns"]
                match => [
                   "message" ,"%{WORD:message_type}: %{NUMBER:log_time} %{NUMBER:duration} %{IP:client_address} %{WORD:transaction_result_code}/%{NUMBER:http_result_code} %{NUMBER:bytes:int} %{WORD:http_metod} %{PROTOCOL:url_protocol}%{DOMAIN:url_domain}%{REFERER:url_referer} %{NOTSPACE:user} %{NOTSPACE:requested_server} %{NOTSPACE:response_mime_type} %{NOT_HYPHEN:acl_decision_tag}-%{NOT_HYPHEN:access_or_decryption_policy}-%{NOT_HYPHEN:identity_policy_group}-%{NOT_HYPHEN:outbound_maleware_scanning_policy_group}-%{NOT_HYPHEN:data_security_policy_group}-%{NOT_HYPHEN:external_dlp_policy_group}-%{NOT_HYPHEN:routing_policy_group} <%{NOT_COMMA:url_category},%{NOT_COMMA:wbrs},%{NOT_COMMA:webroot_verdict},%{NOT_COMMA:spyname},%{NOT_COMMA:trr},%{NOT_COMMA:threat_id},%{NOT_COMMA:trace_id},%{NOT_COMMA:mcafee_verdict},%{NOT_COMMA:mcafee_filenmae},%{NOT_COMMA:mcafee_scan_error_code},%{NOT_COMMA:mcafee_detection_type},%{NOT_COMMA:mcafee_virus_type},%{NOT_COMMA:mcafee_virus_name},%{NOT_COMMA:sophos_verdict},%{NOT_COMMA:sophos_scan_return_code},%{NOT_COMMA:sophos_file_location},%{NOT_COMMA:sophos_threat_name},%{NOT_COMMA:data_security},%{NOT_COMMA:data_loss_prevention},%{NOT_COMMA:requested_side_url_verdict},%{NOT_COMMA:response_side_url_verdict},%{NOT_COMMA:unified_inbound_dvs_verdict},%{NOT_COMMA:web_reputation_filter_type},%{NOT_COMMA:avc_application_name},%{NOT_COMMA:avc_application_type},%{NOT_COMMA:avc_application_behavior},%{NOT_COMMA:avc_safe_browsing_scanning_verdict},%{NOT_COMMA:average_bandwidth},%{NOT_COMMA:throttle_flag},%{NOT_COMMA:type_of_user},%{NOT_COMMA:unified_outbound_dvs_verdict},%{NOT_COMMA:outbound_threat_name}%{GREEDYDATA:message_body}>"
                ]
             }
      if [client_address] =~ /^10\.XXX {
               mutate { replace      => { "[geoip][timezone]"      => "Russia/Moscow" } }
               mutate { replace      => { "[geoip][country_name]"  => "Russia" } }
               mutate { replace      => { "[geoip][country_code2]" => "RU" } }
               mutate { replace      => { "[geoip][country_code3]" => "RUS" } }
               mutate { remove_field => [ "[geoip][location]" ] }
               mutate { add_field    => { "[geoip][location]"      => "37.617" } }
               mutate { add_field    => { "[geoip][location]"      => "55.752" } }
               mutate { convert      => [ "[geoip][location]",        "float" ] }
               mutate { replace      => [ "[geoip][latitude]",        37.617 ] }
               mutate { convert      => [ "[geoip][latitude]",        "float" ] }
               mutate { replace      => [ "[geoip][longitude]",       55.752 ] }
               mutate { convert      => [ "[geoip][longitude]",       "float" ] }
            }
       else if [client_address] =~ /^10\.XXX{
               mutate { replace      => { "[geoip][timezone]"      => "Italia/Rome" } }
               mutate { replace      => { "[geoip][country_name]"  => "Italia" } }
               mutate { replace      => { "[geoip][country_code2]" => "IT" } }
               mutate { replace      => { "[geoip][country_code3]" => "ITA" } }
               mutate { remove_field => [ "[geoip][location]" ] }
               mutate { add_field    => { "[geoip][location]"      => "11.359" } }
               mutate { add_field    => { "[geoip][location]"      => "43.783" } }
               mutate { convert      => [ "[geoip][location]",        "float" ] }
               mutate { replace      => [ "[geoip][latitude]",        11.359 ] }
               mutate { convert      => [ "[geoip][latitude]",        "float" ] }
               mutate { replace      => [ "[geoip][longitude]",       43.783 ] }
               mutate { convert      => [ "[geoip][longitude]",       "float" ] }
       }
       else if [client_address] =~ /^10\.XXX{
               mutate { replace      => { "[geoip][timezone]"      => "Poland/Warsaw" } }
               mutate { replace      => { "[geoip][country_name]"  => "Poland" } }
               mutate { replace      => { "[geoip][country_code2]" => "PL" } }
               mutate { replace      => { "[geoip][country_code3]" => "POL" } }
               mutate { remove_field => [ "[geoip][location]" ] }
               mutate { add_field    => { "[geoip][location]"      => "21.011" } }
               mutate { add_field    => { "[geoip][location]"      => "52.221" } }
               mutate { convert      => [ "[geoip][location]",        "float" ] }
               mutate { replace      => [ "[geoip][latitude]",        21.011 ] }
               mutate { convert      => [ "[geoip][latitude]",        "float" ] }
               mutate { replace      => [ "[geoip][longitude]",       52.221 ] }
               mutate { convert      => [ "[geoip][longitude]",       "float" ] }
       }
       else if [client_address] =~ /^10\.XXX{
               mutate { replace      => { "[geoip][timezone]"      => "Switzerland/Berne" } }
               mutate { replace      => { "[geoip][country_name]"  => "Switzerland" } }
               mutate { replace      => { "[geoip][country_code2]" => "CH" } }
               mutate { replace      => { "[geoip][country_code3]" => "CHE" } }
               mutate { remove_field => [ "[geoip][location]" ] }
               mutate { add_field    => { "[geoip][location]"      => "7.446" } }
               mutate { add_field    => { "[geoip][location]"      => "46.947" } }
               mutate { convert      => [ "[geoip][location]",        "float" ] }
               mutate { replace      => [ "[geoip][latitude]",        7.446 ] }
               mutate { convert      => [ "[geoip][latitude]",        "float" ] }
               mutate { replace      => [ "[geoip][longitude]",       46.947 ] }
               mutate { convert      => [ "[geoip][longitude]",       "float" ] }
       }
       else if [client_address] =~ /^10\.XXX{
             mutate { replace      => { "[geoip][timezone]"      => "Austria/Vienna" } }
               mutate { replace      => { "[geoip][country_name]"  => "Austria" } }
               mutate { replace      => { "[geoip][country_code2]" => "AT" } }
               mutate { replace      => { "[geoip][country_code3]" => "AUT" } }
               mutate { remove_field => [ "[geoip][location]" ] }
               mutate { add_field    => { "[geoip][location]"      => "15.047" } }
               mutate { add_field    => { "[geoip][location]"      => "47.625" } }
               mutate { convert      => [ "[geoip][location]",        "float" ] }
               mutate { replace      => [ "[geoip][latitude]",        15.047 ] }
               mutate { convert      => [ "[geoip][latitude]",        "float" ] }
               mutate { replace      => [ "[geoip][longitude]",       47.625 ] }
               mutate { convert      => [ "[geoip][longitude]",       "float" ] }
       }
       else if [client_address] =~ /^10\.XXX or [client_address] =~ /^10\.15[0-1]\./{
             mutate { replace      => { "[geoip][timezone]"      => "Czech Republic/Prague" } }
               mutate { replace      => { "[geoip][country_name]"  => "Czech Republic" } }
               mutate { replace      => { "[geoip][country_code2]" => "CZ" } }
               mutate { replace      => { "[geoip][country_code3]" => "CZE" } }
               mutate { remove_field => [ "[geoip][location]" ] }
               mutate { add_field    => { "[geoip][location]"      => "14.551" } }
               mutate { add_field    => { "[geoip][location]"      => "50.061" } }
               mutate { convert      => [ "[geoip][location]",        "float" ] }
               mutate { replace      => [ "[geoip][latitude]",        14.551 ] }
               mutate { convert      => [ "[geoip][latitude]",        "float" ] }
               mutate { replace      => [ "[geoip][longitude]",       50.061 ] }
               mutate { convert      => [ "[geoip][longitude]",       "float" ] }
       }
       else if [client_address] =~ /^10\.XXX or [client_address] =~ /^10\.15[2-3]\./{
             mutate { replace      => { "[geoip][timezone]"      => "Hungary/Budapest" } }
               mutate { replace      => { "[geoip][country_name]"  => "Hungary" } }
               mutate { replace      => { "[geoip][country_code2]" => "HU" } }
               mutate { replace      => { "[geoip][country_code3]" => "HUN" } }
               mutate { remove_field => [ "[geoip][location]" ] }
               mutate { add_field    => { "[geoip][location]"      => "19.099" } }
               mutate { add_field    => { "[geoip][location]"      => "47.425" } }
               mutate { convert      => [ "[geoip][location]",        "float" ] }
               mutate { replace      => [ "[geoip][latitude]",        19.099 ] }
               mutate { convert      => [ "[geoip][latitude]",        "float" ] }
               mutate { replace      => [ "[geoip][longitude]",       47.425 ] }
               mutate { convert      => [ "[geoip][longitude]",       "float" ] }
       }
else if [client_address] =~ /^10\XXX{
             mutate { replace      => { "[geoip][timezone]"      => "China/Beijing" } }
               mutate { replace      => { "[geoip][country_name]"  => "China" } }
               mutate { replace      => { "[geoip][country_code2]" => "CN" } }
               mutate { replace      => { "[geoip][country_code3]" => "CHN" } }
               mutate { remove_field => [ "[geoip][location]" ] }
               mutate { add_field    => { "[geoip][location]"      => "98.236" } }
               mutate { add_field    => { "[geoip][location]"      => "34.769" } }
               mutate { convert      => [ "[geoip][location]",        "float" ] }
               mutate { replace      => [ "[geoip][latitude]",        98.236 ] }
               mutate { convert      => [ "[geoip][latitude]",        "float" ] }
               mutate { replace      => [ "[geoip][longitude]",       34.769 ] }
               mutate { convert      => [ "[geoip][longitude]",       "float" ] }
       }
       else if [client_address] =~ /^10\.XXX{
             mutate { replace      => { "[geoip][timezone]"      => "Slovakia/Bratislava" } }
               mutate { replace      => { "[geoip][country_name]"  => "Slovakia" } }
               mutate { replace      => { "[geoip][country_code2]" => "SK" } }
               mutate { replace      => { "[geoip][country_code3]" => "SVK" } }
               mutate { remove_field => [ "[geoip][location]" ] }
               mutate { add_field    => { "[geoip][location]"      => "19.013" } }
               mutate { add_field    => { "[geoip][location]"      => "48.733" } }
               mutate { convert      => [ "[geoip][location]",        "float" ] }
               mutate { replace      => [ "[geoip][latitude]",        19.013 ] }
               mutate { convert      => [ "[geoip][latitude]",        "float" ] }
               mutate { replace      => [ "[geoip][longitude]",       48.733 ] }
               mutate { convert      => [ "[geoip][longitude]",       "float" ] }
       }
       else if [client_address] =~ /^10\.XXX{
             mutate { replace      => { "[geoip][timezone]"      => "Slovenia/Ljubljana" } }
               mutate { replace      => { "[geoip][country_name]"  => "Slovenia" } }
               mutate { replace      => { "[geoip][country_code2]" => "SI" } }
               mutate { replace      => { "[geoip][country_code3]" => "SVN" } }
               mutate { remove_field => [ "[geoip][location]" ] }
               mutate { add_field    => { "[geoip][location]"      => "14.534" } }
               mutate { add_field    => { "[geoip][location]"      => "46.052" } }
               mutate { convert      => [ "[geoip][location]",        "float" ] }
               mutate { replace      => [ "[geoip][latitude]",        14.534 ] }
               mutate { convert      => [ "[geoip][latitude]",        "float" ] }
               mutate { replace      => [ "[geoip][longitude]",       46.052 ] }
               mutate { convert      => [ "[geoip][longitude]",       "float" ] }
       }

       else{

          geoip {
             source =>   "client_address"
             target =>   "geoip"
             database => "/etc/logstash/conf.d/geo/GeoLiteCity.dat"
             add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
             add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
          }

          mutate {
             convert => [ "[geoip][coordinates]", "float"]
             }
          }
        }
          else {

            grok {
             match => [
             "message", "%{CISCOFW106001}",
             "message", "%{CISCOFW106006_106007_106010}",
             "message", "%{CISCOFW106014}",
             "message", "%{CISCOFW106015}",
             "message", "%{CISCOFW106021}",
             "message", "%{CISCOFW106023}",
             "message", "%{CISCOFW106100}",
             "message", "%{CISCOFW110002}",
             "message", "%{CISCOFW302010}",
             "message", "%{CISCOFW302013_302014_302015_302016}",
             "message", "%{CISCOFW302020_302021}",
             "message", "%{CISCOFW305011}",
             "message", "%{CISCOFW313001_313004_313008}",
             "message", "%{CISCOFW313005}",
             "message", "%{CISCOFW402117}",
             "message", "%{CISCOFW402119}",
             "message", "%{CISCOFW419001}",
             "message", "%{CISCOFW419002}",
             "message", "%{CISCOFW500004}",
             "message", "%{CISCOFW602303_602304}",
             "message", "%{CISCOFW710001_710002_710003_710005_710006}",
             "message", "%{CISCOFW713172}",
             "message", "%{CISCOFW733100}"
             ]
            }

          mutate {
             convert => [ "bytes", "integer" ]
             convert => [ "duration", "float" ]
             }
          }
       }
    }

Sry for spaming this thread... post int restricted to 5000 chars

Please provide me with setting metric filter without interruption and loss of filters...

Whille I am glad to help you figure out your problem you going to have to read the documentation too, you can find the metrics filter on this page

You may want to create a catch all output, as if the line is not a program named accesslog-elastic and I don't think your using the "IN" correctly, Program is probably not an array of strings and that is the only time I see that used. if "asa" IN tags. I believe you want to convert too a regex =~ instead of literals . like so:

if [program] =~ /accesslog-elastic/
else if [program] =~ /ASA/
else
// write something to a "unknown index"

you may be dropping records because they don't exactly match.

Once you get the metric's I usually send them to a seperate index and then graph it so I can see how it looks over time.

Like so

 filter {
	metrics {
       meter => "events"
 	  add_field => ["type","indexer-metric" ]
	  add_field => ["host","${HOSTNAME}" ]
        }
 }

Hi, Thank you. Pls find the output from metrics filter:

#events.rate_15m
398.514
#events.rate_1m
570.046
#events.rate_5m
458.24

Output file is reconfigured as follows:

output {
  if [program] =~ /accesslog-elastic/ {
    elasticsearch {
      hosts => [ "xxx:9200" ]
      index => "proxy-v2-%{+YYYY.MM.dd}"
    }
  }
  else if [program] =~ /ASA/ {
    elasticsearch {
      hosts => [ "xxx:9200" ]
      index => "firewall-%{+YYYY.MM.dd}"
    }
  }
  else if "metric" in [tags] {
    elasticsearch {
      hosts => [ "xxx:9200" ]
      index => "metrics-%{+YYYY.MM.dd}"
    }
  }
  else {
    elasticsearch {
      hosts => [ "xxx:9200" ]
      index => "logstash-"

On https://www.elastic.co/guide/en/logstash/current/plugins-filters-metrics.html
is an example for config like: if "expression" is in [field]

 input {
      generator {
        type => "generated"
      }
    }

    filter {
      if [type] == "generated" {
        metrics {
          meter => "events"
          add_tag => "metric"
        }
      }
    }

    output {
      # only emit events with the 'metric' tag
      if "metric" in [tags] {
        stdout {
          codec => line {
            format => "rate: %{[events][rate_1m]}"
          }
        }
      }
    }

I'll do some another validations of index vs. logfile to figure out a missmatching of using "IN" in the outpur file as you said: [quote="eperry, post:12, topic:59099"]
and I don't think your using the "IN" correctly
[/quote]

yah your volume is nothing in logstash and for the syslog

But this is a good numberic to check, does your syslog generate about that many lines per minute.

If so, then you know the data is reaching the filter. and your issue would be your output section

What I meant by not using IN correctly, was that

  • Try using regex in your if statement
  • You have a condition where you don't do anything when it is not an ASA or program in the field, having a catchall would be helpful
    as things may be failing both if statements

sorry missed your other post, yah that other output section.

Yes that series of logic looks more consistent and predictable

Is anthing making it in to the logstash index?

I've configured a "non-ASA" firewall to log to this index due to testing the matching conditions... and some linux stuff like sshd, cron ...[quote="eperry, post:17, topic:59099"]
yah your volume is nothing in logstash and for the syslog
[/quote]
What does that mean ? Where is the leak ?

Which opportunity provides metrics to display the logging hosts ?

There is no leak, at least none that is known about, what I was reffering to the amount of messages your processing vs load avarage (Logstash can handle a lot more with out an issue or tunning)

The only thing it could be, either your not writing the data to the index or your not receiving it.

Unfornately I have given you all the idea's I can to help point you in the right direction.

The last IDEA I can come up with, take a file with known size and known lines in the log file. And process that to see if every line shows up in the index. If it does not show up then we have something we can review, .