Logstash configuration : 6.3

Hi Team,

I am new to ELK stack and I am looking for a solution for the below problem.

ELK Components :
ElasticSearch Cluster ( 1 Master & 2 Data Nodes )
Kibana is Installed on same server as the master
Logstash is a different server

Filebeat is used as an Input Agent which is Installed on Remote Windows Servers.

FileBeat Pushes the Data to Logstash
Logstash Parses the data in the structured Format
ElasticSearch gets the data and the same is displayed on Kibana Dashboard..

This is the Basic Function of how the ELK stack works..

Problem Statement :
"Filebeat sends logs from C: Drive and E: Drive ( Windows Servers )". The Logs in the directory have 2 different data fields for which we have written 2 Filter Patterns. Need to understand how should be the if condition inside the filter plugin.

Filebeat Configuration :
filebeat.inputs:

  • type: log
    paths:
    • C:\inetpub\logs\LogFiles\test1*
    • E:\logs\test*
      output.logstash:
      hosts: ["IP:5044"]

We are getting Input from these 2 log directories which has around 10-15 log files each.
Now the issue is we have 2 patterns for 2 log files in C drive and E Drive

C Drive Pattern :
%{DATE:Date} %{TIME:time} %{IP:serverIP} %{WORD:httpMethod} %{PATH:appAPIName} %{NOTSPACE:user} %{NUMBER:port} %{NOTSPACE:user} %{IP:clientIP} %{NOTSPACE:sourceClient} %{NOTSPACE:user2} %{NUMBER:statusCode} %{NUMBER:int} %{NUMBER:int1} %{GREEDYDATA:greed}

E Drive : JSON Format

json {
source => "message"
target => "response"
}

Now, we want to execute the JSON block if the log path is "E:\logs\test" and execute the pattern if the log path is "C:\inetpub\logs\LogFiles\test1"

So how do we write the filter plugin for the same ??

Let me know if you need any other details

If you use this output (below) you will see which field Filebeat puts the path in, alternatively, there may be some other field's value, that Filebeat fills in, that you can use to identify which events are in which format.

output {
  stdout { codec => rubydebug { metadata => true } }
}

then use if conditionals to "steer" the events to their filters. Docs here.

Thank You for your time and response..

I have configured output to elasticsearch and I am getting the output in the kibana Dashboard.
This isn't working for 2 log files with 2 different filter patters..

So I need to use Conditional statements and execute if the path => " \inetpub\logs\LogFiles\test1" ??

yes, use the regex style if conditional.

if [your field here]  =~ "\inetpub\logs\LogFiles\test1" {

} else {

}

I don't think you need to escape the backslashes.

Okay. Thanks..
I will have a check and update

I have one question..

if [your field here]
we are passing input from filebeat and my input field looks like this..

input {
beats {
port => 5044
ssl => false
}
}

So in this case what will be my field in the "if" condition ??

You follow the instructions I gave before.

I know you are using the elasticsearch output but you can use the one I gave temporarily to see what your event's fields and values are while you develop your solution out.

You can use the # character to comment out the elasticsearch output temporarily. Be sure to comment all lines indented to the same level or deeper as the word elasticsearch.

Good Morning !!
I have tried below configuration and the data is getting parsed through the filter plugin and I get desired output..

Configuration :

input {
file {
path => "/Users/pavan/Downloads/u_ex180821.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}

filter {
if [path] == "/Users/pavan/Downloads/u_ex180821.log" {
grok {
match => {"message" => "%{DATE:Date} %{TIME:time} %{IP:serverIP} %{WORD:httpMethod} %{PATH:appAPIName} %{NOTSPACE:user} %{NUMBER:port} %{NOTSPACE:user} %{IP:clientIP} %{NOTSPACE:sourceClient} %{NOTSPACE:user2} %{NUMBER:statusCode} %{NUMBER:int} %{NUMBER:int1} %{GREEDYDATA:greed}" }
}
}
}

output {
stdout { codec => rubydebug { metadata => true } }
}

Output


{
"int" => "0",
"statusCode" => "200",
"Date" => "18-08-21",
"serverIP" => "",
"appAPIName" => "
**",
"greed" => "237\r",
"host" => "K9-MAC-025.local",
"user" => [
[0] "-",
[1] "-"
],
"httpMethod" => "POST",
"port" => "443",
"sourceClient" => "-",
"user2" => "-",
"@version" => "1",
"@timestamp" => 2018-08-22T16:17:33.133Z,
"clientIP" => "
",
"int1" => "0",
"message" => "2018-08-21 06:19:33 10.49.2.58 POST ******************** - 443 - 10.9.65.14 - - 200 0 0 237\r",
"path" => "/Users/pavan/Downloads/u_ex180821.log",
"time" => "06:19:33"
}


Similarly, how do we match for the path [ if [path] == "/Users/pavan/Downloads/u_ex180821.log" ] in the filter field, if the input is coming from beats ??
In my scenario, I have 2 log files as input from filebeat and 2 different pattern's for these log files.

Beats Configuration
filebeat.inputs:

  • type: log
    enabled: true
    paths:
    • /Users/pavan/Downloads/u_ex180821.log
      output.logstash:
      hosts: ["localhost:5044"]

Can I define a variable in filebeat yaml and match it ?? or any other way I can achieve this ?

https://www.elastic.co/guide/en/beats/filebeat/current/exported-fields.html

Particularly https://www.elastic.co/guide/en/beats/filebeat/current/exported-fields-log.html

source
type: keyword
required: True
The file from which the line was read. This field contains the absolute path to the file. For example: /var/log/system.log .

If, in your original config, you had simply changed the output to stdout as pointed out, you would have seen that the source field contains the file path of the file that the message was read from. The file input puts that info in the path field.

I have used stdout in the output field but the input was not configured for beats. That's why the source field did not come..

When I configured input as beats, I can see the log path..

Tried below configuration and it works..


input {
beats {
port => 5044
ssl => false
}
}

filter {
if [source] == "/Users/pavan/Downloads/u_ex180821.log" {
grok {
match => {"message" => "%{DATE:Date} %{TIME:time} %{IP:serverIP} %{WORD:httpMethod} %{PATH:appAPIName} %{NOTSPACE:user} %{NUMBER:port} %{NOTSPACE:user} %{IP:clientIP} %{NOTSPACE:sourceClient} %{NOTSPACE:user2} %{NUMBER:statusCode} %{NUMBER:int} %{NUMBER:int1} %{GREEDYDATA:greed}" }
}
}
else if [source] == "/Users/pavan/Downloads/BBClients.log" {
json {
source => "message"
target => "response"
}
}
}

output {
if [beat][hostname] == "K9-MAC-025.local" and [source] == "/Users/pavan/Downloads/u_ex180821.log" {
elasticsearch {
hosts => "localhost:9200"
index => "cdrive"
}
}
else if [beat][hostname] == "K9-MAC-025.local" and [source] == "/Users/pavan/Downloads/BBClients.log" {
elasticsearch {
hosts => "localhost:9200"
index => "edrive"
}
}
stdout {}
}

OUTPUT
"statusCode" => "200",
"source" => "/Users/pavan/Downloads/u_ex180821.log",
"greed" => "250",
"time" => "11:43:13",
"serverIP" => "",
"appAPIName" => "**************",
"@version" => "1",
"beat" => {
"hostname" => "K9-MAC-025.local",
"name" => "K9-MAC-025.local",
"version" => "6.3.2"
},
"httpMethod" => "POST",
"offset" => 660883,
"@timestamp" => 2018-08-23T13:35:25.446Z,
"user2" => "-",
"user" => [
[0] "-",
[1] "-"
],
"host" => {
"name" => "K9-MAC-025.local"
},
"int" => "0",
"prospector" => {
"type" => "log"
},
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"sourceClient" => "-",
"int1" => "0",
"port" => "443",
"message" => "2018-08-21 11:43:13 10.49.2.58 POST *********************** - 443 - 10.9.65.14 - - 200 0 0 250"
}
{
"Date" => "18-08-21",
"clientIP" => "
",
"input" => {
"type" => "log"
},

Now, in my scenario, the PATH : /Users/pavan/Downloads/ has over 20 different log files.. So in that case I want to match the directory instead of the exact log file name..

I tried using a * [ if [source] == "/Users/pavan/Downloads/*" ], but it is not accepting..

I am trying for alternatives. Let me know if something can be used in this case

Thank You in Advance.

It worked.. Instead of "if [source] == "/Users/pavan/Downloads/cdrive/u_ex180821.log"", I tried matching only a pattern as below.

if [source] =~ "cdrive"

Thank You for your assistance. appreciate it

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.