I am currently using the following pipeline to get my data to Elasticsearch
syslog---->filebeat---->kafka---->logstash---->elasticsearch
I am trying to capture shutdown and restart events from the syslog. The syslog entry for the shutdown is as given below:
Mar 5 08:55:01 env-cs-westus-0 systemd-shutdownd: Creating /run/nologin, blocking further logins...
Mar 5 08:55:46 env-cs-westus-0 kernel: hv_utils: Shutdown request received - graceful shutdown initiated
Mar 5 09:18:31 env-cs-westus-0 systemd-shutdownd: Shutting down at Tue 2019-03-05 09:19:31 UTC (poweroff)...
Mar 5 09:18:31 env-cs-westus-0 systemd-shutdownd: Creating /run/nologin, blocking further logins
Considering the above case of shutdown. I shutdown the server by two methods , by typing in the command "shutdown" or by using Azure portal. Now ideally, Filebeat would be stopped as a part of this process. But I expect Filebeat to pick the logs from where it left (as it is marked in the Filebeat registry). Any idea why this in not happening?
Following is my Filebeat configuration:
fields:
osCategory: linux
osName: centos
osVersion: 07
device: ["cassandra"]
beatName: filebeat
filebeat.modules:
- module: system
syslog:
enabled: true
auth:
enabled: true
output.kafka:
enabled: True
hosts:
- 10.3.2.26:9092
topic: 'fir_beat'
partition:
hash:
reachable_only: False
version: 0.10.0
metadata:
retry:
max: 3
backoff: 250ms
worker: 1
bulk_max_size: 2048
timeout: 30s
broker_timeout: 10s
channel_buffer_size: 256
compression: gzip
max_message_bytes: 1000000
required_acks: 1
client_id: "beats"
My logstash config is shown below:
input {
kafka {
bootstrap_servers => [ "x.x.x.x:9092"]
session_timeout_ms => "180000"
request_timeout_ms => "190000"
topics => [ "fir_beat"]
}
}
filter {
json {
source => "message"
}
if [beatName] != "filebeat" {
mutate {
remove_field => ["message"]
}
}
if [source] == "/var/lib/ntp/drift" {
grok{
match => {"message" => "%{NUMBER:driftValue}"}
}
mutate {
convert => {"driftValue" => "float"}
}
}
if [source] == "/var/log/messages" or [source] == "/var/log/syslog" {
mutate {
add_field => { "logName" => "syslog" }
}
}
if [source] == "/var/log/secure" or [source] == "/var/log/auth.log" {
mutate {
add_field => { "logName" => "auth" }
}
}
}
output {
if [beatName] == "filebeat" {
elasticsearch {
hosts => ["10.3.3.14:9890"]
index => "%{beatName}-%{[beat][version]}-%{+YYYY.MM.dd}"
document_type => "doc"
pipeline => "filebeat-5.6.11-system-%{logName}-pipeline"
}
}
else {
elasticsearch {
hosts => ["10.3.3.14:9890"]
index => "%{beatName}-%{[beat][version]}-%{+YYYY.MM.dd}"
document_type => "doc"
}
}
# stdout {
# codec => "rubydebug"
# }
}
Versions used
Filebeat: 5.6.11
Logstash: 5.6.2
Elasticsearch: 5.6.2