NEW to ELK: using conditionals in filter

Hello. Stuck in configuring pipeline. My filebeat agent send apache_access logs with "fields.type = apache_access" option:

 filebeat.inputs:
 - type: log
   enabled: true
   paths:
     - D:\log\apache\access-*.log
   fields:
     type: apache_access

My conditions in filter section of Logstash pipeline isn't working. I try

  • if [type] == "apache_access"
  • if [fileds.type] == "apache_access"
  • if "apache_access" in [_source.fields.type]
  • if "apache_access" in [fields.type]

Without "if" statement, grok filter for HTTPD_COMMONLOG works perfectly. Without - no error and no filter in Kibana:

Kibana JSON

{
"_index": "xxx-2019.09.09",
"_type": "doc",
"_id": "6apiFG0BnV6XU8Emsk4W",
"_version": 1,
"_score": null,
"_source": {
"message": "10.22.11.10 - - [09/Sep/2019:12:57:53 +0800] "GET /yyy/public/images/logo-xxxx.png HTTP/1.1" 304 -",
"@timestamp": "2019-09-09T04:57:54.264Z",
"ecs": {
"version": "1.0.1"
},
"host": {
"name": "xxxx"
},
"tags": ,
"input": {
"type": "log"
},
"fields": {
"type": "apache_access"
},
"@version": "1",
"agent": {
"version": "7.3.1",
"ephemeral_id": "c37a231e-2534-47c7-8883-5eb592b7e844",
"hostname": "xxxx",
"id": "ea151f77-6c61-487b-b28f-35e0686f58f7",
"type": "filebeat"
},
"log": {
"file": {
"path": "D:\log\apache\access-2019-09-09.log"
},
"offset": 57684
}
},
"fields": {
"@timestamp": [
"2019-09-09T04:57:54.264Z"
]
},
"sort": [
1568005074264
]
}

Whats's wrong with conditional? How can I separate logs - my goal is send several types of logs (apache, mysql, some other software) by one filebeat with different fields.type option and filtering in logstash pipeline with input - beats

@hAh0L13, The below pattern should work:

if [type] == "application_log"

You need to define this if condition in output filter also in logstash.

Thanks.

My logstash config:

logstash.conf

input {
beats {
port => 5044
}
}

filter {
if [type] == "apache_access" {
grok {
match => { "message" => ["%{HTTPD_COMMONLOG}", "%{HTTPD_COMBINEDLOG}"] }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
mutate {
add_tag => ["apache_access"]
}
}
if "beats_input_codec_plain_applied" in [tags] {
mutate {
remove_tag => ["beats_input_codec_plain_applied"]
}
}
}

output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
}
}

I expect "true" in conditional, then grok "message" + retrieve timestamp with data + adding tag with mutate. And for all cases delete standart tag. Now it only delete standart tag. If i delete IF statement, it works fine.

@Tek_Chand, where is "application_log" in my message from filebeat? I should replace fields.type in filebeat config from "apache_access" to "apache_log"?

@hAh0L13,

application_log is just for reference you can change it as per your settings.

In your logstash configuration you have used if condition but didn't used else. But you need to use else also.

Your configuration should be look like below:

input {
  beats {
    port => 5044   
  }
}
filter {
if [type] == "apache_access" {
grok {
match => { "message" => ["%{HTTPD_COMMONLOG}", "%{HTTPD_COMBINEDLOG}"] ] }
}
date {
      date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
mutate {
add_tag => ["apache_access"]
   }
}
else {
grok {
match => { "message" => [ "(?<date-time>[\w\s\d\:]+)\s(?<IP>192.168.50.1)\s(?<port>582)\:\s(?<message>.*)" ] }
}
}
}
output {
  if [type] == "apache_access"
  elasticsearch {
    hosts => ["10.133.58.12:9200"]
    sniffing => true
    manage_template => false
#    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
#    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    index => "application-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][type]}"
  }
}
else
  {
elasticsearch {
    hosts => ["10.133.58.12:9200"]
    sniffing => true
    manage_template => false
#    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][type]}"
}
}
}

Above configuration is for your help it may will not work at your end. You may need to make changes at your end as per your detail and configuration like index name, IPs etc.
Thanks.

Ok. I think found solution - adding "fields_under_root: true" option to filebeat let me use "if [type] == "application_log"" conditional. Maybe using sub-dictionary fields in conditionals is forbidden?

@hAh0L13,

Yes, you need to set above setting in your filebeat.yml.

Thanks.

No, it is not forbidden, but the syntax to reference a sub-field in logstash is [fields][type]. [fields.type] would reference a field that contains a period in its name.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.