FileBeats to Logstash

Hi everyone,

We have facing a weird issue about filebeats to logstash.
We have at this moment 5 conf files in logstash with jdbc query.
We wants to add a new conf which is for filebeats so listen on port 5044 and output to elastic.
When we add this new conf, something weird appens, this input in logstash send to more than one index we had specify so this change other index and his data !
the output index is logstash maybe it's for that? We don't understand this "bug".

Here our filebeats cfg

############################# Filebeat ######################################
filebeat:
# List of prospectors to fetch data.
prospectors:
# Each - is a prospector. Below are the prospector specific configurations
-
paths:
- D:\Live DB Jobservice logs\JobService.log
- D:\Live DB Jobservice logs\JobService.log_*
input_type: log
document_type: iagjob
exclude_lines: ["^Logging severity"]
ignore_older: 2h
multiline:

# The regexp Pattern that has to be matched.
pattern: "^<[a-z]>"

# Defines if the pattern set under pattern should be negated or not. Default is false.
negate: true

# Match can be set to "after" or "before". It is used to define if lines should be append to a pattern
# that was (not) matched before or after or as long as a pattern is not matched based on negate.
# Note: After is the equivalent to previous and before is the equivalent to to next in Logstash
match: after

-
paths:
- C:\inetpub\wwwroot\Habilitations\App_Data\Logs\*.txt
- C:\inetpub\wwwroot\Entitlements\App_Data\Logs\*.txt
input_type: log
ignore_older: 2h
document_type: iaglog
multiline:

# The regexp Pattern that has to be matched.
pattern: "^LOG [[:digit:]]{4}-[[:digit:]]{2}-[[:digit:]]{2}"

# Defines if the pattern set under pattern should be negated or not. Default is false.
negate: true

# Match can be set to "after" or "before". It is used to define if lines should be append to a pattern
# that was (not) matched before or after or as long as a pattern is not matched based on negate.
# Note: After is the equivalent to previous and before is the equivalent to to next in Logstash
match: after

-
paths:
- C:\inetpub\logs\LogFiles\W3SVC1\*.log
input_type: log
ignore_older: 2h
exclude_lines: ["^#"]
document_type: iislog

# -
#  paths:
#    - C:\tmp\csltool.log
#  input_type: log
#  document_type: csllog

-
paths:
- D:\Logs\SQL_Habilitations\SQL*.log
- C:\inetpub\wwwroot\Entitlements\App_Data\SQLLogs\SQL*.log
input_type: log
ignore_older: 2h
document_type: sqllog

# Mutiline can be used for log messages spanning multiple lines. This is common
# for Java Stack Traces or C-Line Continuation
multiline:

# The regexp Pattern that has to be matched.
pattern: "^[[:digit:]]{4}-[[:digit:]]{2}-[[:digit:]]{2}"

# Defines if the pattern set under pattern should be negated or not. Default is false.
negate: true

# Match can be set to "after" or "before". It is used to define if lines should be append to a pattern
# that was (not) matched before or after or as long as a pattern is not matched based on negate.
# Note: After is the equivalent to previous and before is the equivalent to to next in Logstash
match: after


# General filebeat configuration options
#
# Event count spool threshold - forces network flush if exceeded
#spool_size: 2048

# Defines how often the spooler is flushed. After idle_timeout the spooler is
# Flush even though spool_size is not reached.
#idle_timeout: 5s

# Name of the registry file. Per default it is put in the current working
# directory. In case the working directory is changed after when running
# filebeat again, indexing starts from the beginning again.
registry_file: "C:/ProgramData/filebeat/registry"

# Full Path to directory with additional prospector configuration files. Each file must end with .yml
# These config files must have the full filebeat config part inside, but only
# the prospector part is processed. All global options like spool_size are ignored.
# The config_dir MUST point to a different directory then where the main filebeat config file is in.
#config_dir:

############################# Output ##########################################

# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
output:
### Logstash as output
logstash:
# The Logstash hosts
hosts: ["localhost:5044"]

And here our logstash conf :

input {
beats {
port => 5044
}
}

filter{

if [type] =~ "sqllog" {
grok {
match => ["message", "%{TIMESTAMP_ISO8601:log_timestamp} %{NOTSPACE} - %{NOTSPACE:level} - \(<? ?%{NUMBER:duration:int} ms\) - %{GREEDYDATA:request}"]
}

if [request] =~ "^Reading BLOB" {
drop {}
}

mutate {
add_field => { "request_gen" => "%{request}" }
}

mutate {
gsub => [
"request_gen", "'\h{8}\-\h{4}\-\h{4}\-\h{4}\-\h{12}'", "':UUID'"
]
}
} else if [type] =~ "iislog" {

#ignore log comments
if [message] =~ "^#" {
drop {}
}

grok {
match => ["message", "%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:site} %{WORD:method} %{URIPATH:page} %{NOTSPACE:querystring} %{NUMBER:port} - %{IPORHOST:clienthost} %{NOTSPACE:useragent} %{NUMBER:response} %{NUMBER:scstatus} %{NUMBER:bytes} %{NUMBER:timetaken:int}"]
}

} else if [type] =~ "iaglog" {

grok {
match => ["message", "^LOG %{TIMESTAMP_ISO8601:log_timestamp} %{GREEDYDATA:message}"]
overwrite => [ "message" ]
}

} else if [type] =~ "iagjob" {
mutate {
gsub => [
"message", "<x>", ""
]
}

grok {
match => {
"message" => [
"<(?<message_type>[p])>%{TIMESTAMP_ISO8601:log_timestamp} %{NOTSPACE} - Process step parameter %{NOTSPACE:uid_job}:%{GREEDYDATA:log_message}",
"<(?<message_type>[p])>%{TIMESTAMP_ISO8601:log_timestamp} %{NOTSPACE} - Process step output parameter %{NOTSPACE:uid_job}:%{GREEDYDATA:log_message}",
"<(?<message_type>[i|w])>%{TIMESTAMP_ISO8601:log_timestamp} %{NOTSPACE} - %{NOTSPACE:level}: %{GREEDYDATA:log_message}",
"<(?<message_type>[s|e])>%{TIMESTAMP_ISO8601:log_timestamp} %{NOTSPACE} - %{NOTSPACE:component} - %{NOTSPACE:uid_job}: %{GREEDYDATA:log_message}"
]
}
#      overwrite => [ "message" ]
}
}

date {
match => ["log_timestamp", "YYYY-MM-dd HH:mm:ss"]
target => ["@timestamp"]
}

mutate {
remove_field => ["log_timestamp"]
}

}
output {
#  stdout {}
elasticsearch {
hosts => ["localhost:10010"]
index => "logstash-%{+YYYY.MM}"
}
}

Logstash has a single event pipeline. All events are sent to all outputs in all configuration files. If this isn't what you want you need to wrap your outputs in conditionals (like you're already doing with your filters) to control which outputs get which events.

Okay we understand the process.
So we can have multiple configuration files while there are conditional statement in the output to index only data needed?

You can have multiple configuration files, only one .conf file with the three blocks or keep the inputs, filters and outputs in separated files, but to save different inputs in different indices, you will need to use conditional statements in the filter and output blocks.

When logstash is started it will concatenate all the files in the conf.d directory.

filter {
    if [type] == "typeA" {
       # filters for the typeA input
    }
    if [type] == "typeB" {
       # filters for the typeB input
    }
}

output {
    if [type] == "typeA" {
        # output for the typeA input
    } 
    if [type] == "typeB" {
        # output for the typeB input
    }
}

Thanks, this is what we wanted.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.