Filebeat not capturing all events in syslog

I am currently using the following pipeline to get my data to Elasticsearch
syslog---->filebeat---->kafka---->logstash---->elasticsearch

I am trying to capture shutdown and restart events from the syslog. The syslog entry for the shutdown is as given below:

Mar  5 08:55:01 env-cs-westus-0 systemd-shutdownd: Creating /run/nologin, blocking further logins...
Mar  5 08:55:46 env-cs-westus-0 kernel: hv_utils: Shutdown request received - graceful shutdown initiated
Mar  5 09:18:31 env-cs-westus-0 systemd-shutdownd: Shutting down at Tue 2019-03-05 09:19:31 UTC (poweroff)...
Mar  5 09:18:31 env-cs-westus-0 systemd-shutdownd: Creating /run/nologin, blocking further logins

Considering the above case of shutdown. I shutdown the server by two methods , by typing in the command "shutdown" or by using Azure portal. Now ideally, Filebeat would be stopped as a part of this process. But I expect Filebeat to pick the logs from where it left (as it is marked in the Filebeat registry). Any idea why this in not happening?

Following is my Filebeat configuration:

fields:
  osCategory: linux
  osName: centos
  osVersion: 07
  device: ["cassandra"]
  beatName: filebeat
filebeat.modules:
- module: system
  syslog:
    enabled: true
  auth:
    enabled: true
output.kafka:
  enabled: True
  hosts:
    - 10.3.2.26:9092
  topic: 'fir_beat'
  partition:
    hash:
      reachable_only: False

  version: 0.10.0
  metadata:
    retry:
      max: 3
      backoff: 250ms

  worker: 1
  bulk_max_size: 2048
  timeout: 30s
  broker_timeout: 10s
  channel_buffer_size: 256
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 1
  client_id: "beats"

My logstash config is shown below:

input {
  kafka {
     bootstrap_servers => [ "x.x.x.x:9092"]
     session_timeout_ms => "180000"
     request_timeout_ms => "190000"
     topics => [ "fir_beat"]
  }
}

filter  {
        json {
                source => "message"
        }

        if [beatName] != "filebeat" {
            mutate {
                remove_field => ["message"]
            }
        }

        if [source] == "/var/lib/ntp/drift" {
            grok{
                match => {"message" => "%{NUMBER:driftValue}"}
            }
            mutate {
                convert => {"driftValue" => "float"}
            }
        }

        if [source] == "/var/log/messages" or [source] == "/var/log/syslog" {
           mutate {
                add_field => { "logName" => "syslog" }
           }
        }

        if [source] == "/var/log/secure" or [source] == "/var/log/auth.log" {
           mutate {
                add_field => { "logName" => "auth" }
           }
        }

}

output {

   if [beatName] == "filebeat" {
      elasticsearch {
        hosts => ["10.3.3.14:9890"]
        index => "%{beatName}-%{[beat][version]}-%{+YYYY.MM.dd}"
        document_type => "doc"
        pipeline => "filebeat-5.6.11-system-%{logName}-pipeline"
      }
   }
   else {
      elasticsearch {
        hosts => ["10.3.3.14:9890"]
        index => "%{beatName}-%{[beat][version]}-%{+YYYY.MM.dd}"
        document_type => "doc"
     }
   }

       # stdout {
       #         codec => "rubydebug"
       # }
}

Versions used
Filebeat: 5.6.11
Logstash: 5.6.2
Elasticsearch: 5.6.2

@arunpmohan By default if you stop syslog and keep Filebeat running, Filebeat should read until EOF of the syslog file and update the offset there, actually everything that an event is acked by the output is should sync the offset on disk.

  1. Did you look at offset to make sure filebeat is at the right offset? (/data/registry)
  2. The events should be already send to Kafka, they are not available in Elasticsearch?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.