Kafka input plugin - decorate_events does not take effect

Hi,

I have a Kafka logstash input plugin that's reading messages from a Kafka 1.0.0 fine.

But the "decorate_events" property does not seem to take effect, i.e, I receive none of the kafka topic/partition information, here's my input code.
I am using Logstash/ES 6.2.4

input {
    kafka {
        bootstrap_servers => "kafka.dev:9092"
        topics => ["retail.clog.1", "digital.clog.1"]
        client_id => "log"
        group_id => "log"
        auto_offset_reset => "earliest"
        consumer_threads => 4
        decorate_events => true
        heartbeat_interval_ms => "3000"
        max_poll_interval_ms => "1000"
        reconnect_backoff_ms => "3000"
        #queue_size => 500
    }
}

Appreciate your help on this.

Thanks,
Arun

Are you looking at [@metadata][kafka] in logstash? It will not get written to elasticsearch.

Hi,

Thanks for a quick response. Didn't quite understand, do I have to explicitly assign [@metadata][kafka] to a variable in logstash, if so, can you please say how.

Rest of the code is just this,

filter {
    json {
        source => "message"
    }
}
output {
	elasticsearch {
		action => "index"
		index => "ms-logs-%{+YYYY.MM.dd}"
		hosts => ["elastic.dev"]
	}
}

Thanks,
Arun

The kafka input puts the topic, partition etc. into fields under [@metadata][kafka]. The [@metadata] field on the event exists in logstash, but is not written to elasticsearch, so if you want to have that data in elasticsearch you would need to use mutate+copy to copy [@metadata][kafka] to another field.

Thank you so much, that helped.

filter {
        json {
                source => "message"
        }
        mutate {
            add_field => {
                "kafka" => "%{[@metadata][kafka]}"
            }
        }
}

Hi,

Is there a way to index the kafka detail as well?
I think because its an inner json it doesn't index in Elasticsearch.
It would be nice to filter out messages of a particular partition in Kibana, for example.

Thanks,
Arun

mutate { copy => { "[@metadata][kafka]" => "kafka" } }

2 Likes

Hi,

I get this error now,

[2018-06-20T15:05:20,979][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch.
{:status=>400, :action=>["index", {:_id=>nil, :_index=>"ms-logs-2018.06.20", :_type=>"doc", :_routing=>nil}, #LogStash::Event:0x354fa377],
:response=>{"index"=>{"_index"=>"ms-logs-2018.06.20", "_type"=>"doc", "_id"=>"ua66HWQBIApBjZNHh1si", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception",
"reason"=>"failed to parse [kafka]", "caused_by"=>{"type"=>"illegal_state_exception",
"reason"=>"Can't get text on a START_OBJECT at 1:248"}}}}}

Thanks,
Arun

Look at the elasticseach log. That will have a clearer error message.

Hi,

Here's the error from elastic logs

[2018-06-20T15:14:41,638][DEBUG][o.e.a.b.TransportShardBulkAction] [ms-logs-2018.06.20][2] failed to execute bulk item (index) BulkShardRequest [[ms-logs-2018.06.20][2]] containing [index {[ms-logs-2018.06.20][doc][CV_DHWQBFf7QtJ8OFSFl], source[{"app":"log","@timestamp":"2018-06-20T15:14:41.528Z","latency":"340","status":"true","ts":"2018-06-20 15:14:41,523","lcruid":"300ac294-1997-432f-a4e2-9da04a5c9","iid":"732a0577b","@version":"1","lccid":"e3ab50c1-c984-4e5d-b954-b657bef57cf","kafka":{"offset":11,"timestamp":1529507681524,"partition":1,"topic":"digital.log.1","key":"e0bc3806-9421-41c2-a70a-744d11d5d","consumer_group":"clog"},"type":"RESPONSE","msg":""Response from Service Layer"","crid":"arun1234","source":"DF"}]}]
org.elasticsearch.index.mapper.MapperParsingException: failed to parse [kafka]
at org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:302) ~[elasticsearch-6.2.4.jar:6.2.4]

1 Like

I suspect the problem is that you previously indexed documents where "kafka" was a string, and now it is an object. Are you in a position to "DELETE ms-logs-2018.06.20"? Or can you wait until tomorrow and see if it starts working at midnight UTC when it rolls to a new index?

Hi,

That was precisely the issue, no problem deleting the index, I am just setting this up.
Thank you so much, you have been very helpful.

Thanks,
Arun

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.