Multiple Fields substitution doesn't work in logstash


(Vramakrishnan) #1

I have annotated filebeat to add these fields.

"fields" => {
"provider" => "foobar",
"resourcetype" => "ipmi",
"customer" => "customer001"
},

And in logstash conf file, I want to use these fields in combination to generate unique elastic index.
I tried these format and they are not expanding correctly.

Please let me know what is the right syntax.

output {
elasticsearch {
index => "%{[fields][resourcetype]}.%{[fields][customer]}.%{[fields.provider]}.%{+YYYY.MM.dd}"

 **index generated by log stash is :   %{[fields][resourcetype].[fields][customer].[fields.provider]}.2017.04.12**  

Using single field as listed below works, but multiple fields doesn't work.
index => "%{[fields][resourcetype]}.%{+YYYY.MM.dd}"


(Magnus Bäck) #2

The index name indicates that your configuration looks like

index => "%{[fields][resourcetype].[fields][customer].[fields.provider]}.%{+YYYY.MM.dd}"

when it should look like this:

index => "%{[fields][resourcetype]}.%{[fields][customer]}.%{[fields.provider]}.%{+YYYY.MM.dd}"

(Vramakrishnan) #3

This format below worked.
index => "%{[fields][resourcetype]}.%{[fields][customer]}.%{[fields][provider]}.%{+YYYY.MM.dd}"

However, when trying as follows, I am not able to get it working:

if [type] == "kafka-input" {
mutate {
add_field => { "elasticindex" => "%{[fields][resourcetype]}.%{[fields][customer]}.%{[fields][provider]}" }
}
}

output {
elasticsearch {
index => "%{elasticindex}.%{+YYYY.MM.dd}"


(Magnus Bäck) #4

Please show an example event where this didn't work, e.g. by showing the result of a stdout { codec => rubydebug } output or by copying/pasting from the JSON tab in Kibana.


(Vramakrishnan) #5

Json log from Kibana

{
"_index": "%{elasticindex}.2017.04.14",
"_type": "log",
"_id": "AVtqhK88H217HTENKGJN",
"_score": null,
"_source": {
"@timestamp": "2017-04-14T03:32:09.809Z",
"offset": 5686,
"beat": {
"hostname": "3495c2daccca",
"name": "FileBeat",
** "version": "5.2.0"**
},
"input_type": "log",
"@version": "1",
"source": "/Users/xyz/work/cplane/testoutput/everest.test.INFO",
"fields": {
** "provider": "provider1",**
** "service": "everest",**
** "resourcetype": "csos",**
** "customer": "infra"**
** },**
"message": "I0414 03:32:05.056421 15270 election.go:249] service/everestd/leader: get CreateIndex 3310 ModifyIndex 8115 LockIndex 6 Session 1037e02c-cb1b-e391-61c7-ed08e74b3b67 Value ohknauzv.everest.provider1.com",
"type": "log"
},
"fields": {
"@timestamp": [
1492140729809
]


(Vramakrishnan) #6

I tried a simple add_field with value as a static string and that substitution is also not working with logstash 5.2 or 5.3 for the logstash conf shown below

Logstash.conf

input {

kafka {
bootstrap_servers => ["10.1.1.1:9092"]
type => "kafka"
topics => ["csoslogs.infra", "ipmi.customer001", "ipmi.customer002"]
codec => "json"
decorate_events => true
}

}

filter {

if [type] == "syslog" {
mutate {
add_field => { "elasticindex" => "%{host}" }
}
}

if [type] == "kafka" {
mutate {
add_field => { "elasticindex" => "10.6.6.1" }
}
}

output {
stdout { codec => rubydebug }
}

output {
elasticsearch {
hosts => ["104.42.236.238:9200"]
user => "es_admin"
password => "Cloudsimple123!"
index => "%{elasticindex}.%{+YYYY.MM.dd}"
}
}

JSON seen in kibana :

{
"_index": "%{elasticindex}.2017.04.14",
"_type": "log",
"_id": "AVtrBCiSH217HTENKISm",
"_score": null,
"_source": {
"@timestamp": "2017-04-14T05:51:24.476Z",
"offset": 71259,
"kafka": {
"consumer_group": "logstash",
"partition": 0,
"offset": 5910,
"topic": "csoslogs.infra",
"key": null
},
"beat": {
"hostname": "3495c2daccca",
"name": "FileBeat",
"version": "5.2.0"
},
"input_type": "log",
"@version": "1",
"source": "/Users/vramakrishnan/work/cplane/testoutput/coordinator.test.INFO",
"fields": {
"podip": "10.6.6.1",
"provider": "provider1",
"service": "coordinator",
"resourcetype": "csos",
"customer": "infra"
},
"message": "I0414 05:51:18.736575 27476 consul_helper.go:286] 0: created consul session 79c34ca4-a1f6-326b-9d5d-43eb62626f3c, so agent client-0 is initialized",
"type": "log"
},
"fields": {
"@timestamp": [
1492149084476
]
},
"sort": [
1492149084476
]
}


(Vramakrishnan) #7

Any workaround or suggestion for this issue is greatly appreciated.

This has turned into a stopper for us and kindly help.


(Vramakrishnan) #8

It turns out the "type == "kafka" check in logstash filter is not working and hence not entering inside the if-block.

This used to work in logstash 5.1.1 and is use of type in filter deprecated ?


(Magnus Bäck) #9

It turns out the "type == "kafka" check in logstash filter is not working and hence not entering inside the if-block.

Yeah, the event's type field contains "log" and not "kafka".

This used to work in logstash 5.1.1 and is use of type in filter deprecated ?

No, but it looks like type => "kafka" in the input doesn't work. Maybe it's because Filebeat already sets the type field to "log" and the kafka input won't override that value? I suggest you set the event type as close to the source as possible, in this case in Filebeat.


(system) #10

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.