Multiple Fields substitution doesn't work in logstash

I have annotated filebeat to add these fields.

"fields" => {
"provider" => "foobar",
"resourcetype" => "ipmi",
"customer" => "customer001"
},

And in logstash conf file, I want to use these fields in combination to generate unique elastic index.
I tried these format and they are not expanding correctly.

Please let me know what is the right syntax.

output {
elasticsearch {
index => "%{[fields][resourcetype]}.%{[fields][customer]}.%{[fields.provider]}.%{+YYYY.MM.dd}"

 **index generated by log stash is :   %{[fields][resourcetype].[fields][customer].[fields.provider]}.2017.04.12**  

Using single field as listed below works, but multiple fields doesn't work.
index => "%{[fields][resourcetype]}.%{+YYYY.MM.dd}"

The index name indicates that your configuration looks like

index => "%{[fields][resourcetype].[fields][customer].[fields.provider]}.%{+YYYY.MM.dd}"

when it should look like this:

index => "%{[fields][resourcetype]}.%{[fields][customer]}.%{[fields.provider]}.%{+YYYY.MM.dd}"

This format below worked.
index => "%{[fields][resourcetype]}.%{[fields][customer]}.%{[fields][provider]}.%{+YYYY.MM.dd}"

However, when trying as follows, I am not able to get it working:

if [type] == "kafka-input" {
mutate {
add_field => { "elasticindex" => "%{[fields][resourcetype]}.%{[fields][customer]}.%{[fields][provider]}" }
}
}

output {
elasticsearch {
index => "%{elasticindex}.%{+YYYY.MM.dd}"

Please show an example event where this didn't work, e.g. by showing the result of a stdout { codec => rubydebug } output or by copying/pasting from the JSON tab in Kibana.

Json log from Kibana

{
"_index": "%{elasticindex}.2017.04.14",
"_type": "log",
"_id": "AVtqhK88H217HTENKGJN",
"_score": null,
"_source": {
"@timestamp": "2017-04-14T03:32:09.809Z",
"offset": 5686,
"beat": {
"hostname": "3495c2daccca",
"name": "FileBeat",
** "version": "5.2.0"**
},
"input_type": "log",
"@version": "1",
"source": "/Users/xyz/work/cplane/testoutput/everest.test.INFO",
"fields": {
** "provider": "provider1",**
** "service": "everest",**
** "resourcetype": "csos",**
** "customer": "infra"**
** },**
"message": "I0414 03:32:05.056421 15270 election.go:249] service/everestd/leader: get CreateIndex 3310 ModifyIndex 8115 LockIndex 6 Session 1037e02c-cb1b-e391-61c7-ed08e74b3b67 Value ohknauzv.everest.provider1.com",
"type": "log"
},
"fields": {
"@timestamp": [
1492140729809
]

I tried a simple add_field with value as a static string and that substitution is also not working with logstash 5.2 or 5.3 for the logstash conf shown below

Logstash.conf

input {

kafka {
bootstrap_servers => ["10.1.1.1:9092"]
type => "kafka"
topics => ["csoslogs.infra", "ipmi.customer001", "ipmi.customer002"]
codec => "json"
decorate_events => true
}

}

filter {

if [type] == "syslog" {
mutate {
add_field => { "elasticindex" => "%{host}" }
}
}

if [type] == "kafka" {
mutate {
add_field => { "elasticindex" => "10.6.6.1" }
}
}

output {
stdout { codec => rubydebug }
}

output {
elasticsearch {
hosts => ["104.42.236.238:9200"]
user => "es_admin"
password => "Cloudsimple123!"
index => "%{elasticindex}.%{+YYYY.MM.dd}"
}
}

JSON seen in kibana :

{
"_index": "%{elasticindex}.2017.04.14",
"_type": "log",
"_id": "AVtrBCiSH217HTENKISm",
"_score": null,
"_source": {
"@timestamp": "2017-04-14T05:51:24.476Z",
"offset": 71259,
"kafka": {
"consumer_group": "logstash",
"partition": 0,
"offset": 5910,
"topic": "csoslogs.infra",
"key": null
},
"beat": {
"hostname": "3495c2daccca",
"name": "FileBeat",
"version": "5.2.0"
},
"input_type": "log",
"@version": "1",
"source": "/Users/vramakrishnan/work/cplane/testoutput/coordinator.test.INFO",
"fields": {
"podip": "10.6.6.1",
"provider": "provider1",
"service": "coordinator",
"resourcetype": "csos",
"customer": "infra"
},
"message": "I0414 05:51:18.736575 27476 consul_helper.go:286] 0: created consul session 79c34ca4-a1f6-326b-9d5d-43eb62626f3c, so agent client-0 is initialized",
"type": "log"
},
"fields": {
"@timestamp": [
1492149084476
]
},
"sort": [
1492149084476
]
}

Any workaround or suggestion for this issue is greatly appreciated.

This has turned into a stopper for us and kindly help.

It turns out the "type == "kafka" check in logstash filter is not working and hence not entering inside the if-block.

This used to work in logstash 5.1.1 and is use of type in filter deprecated ?

It turns out the "type == "kafka" check in logstash filter is not working and hence not entering inside the if-block.

Yeah, the event's type field contains "log" and not "kafka".

This used to work in logstash 5.1.1 and is use of type in filter deprecated ?

No, but it looks like type => "kafka" in the input doesn't work. Maybe it's because Filebeat already sets the type field to "log" and the kafka input won't override that value? I suggest you set the event type as close to the source as possible, in this case in Filebeat.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.