Kafka input plugin - decorate_events does not take effect


I have a Kafka logstash input plugin that's reading messages from a Kafka 1.0.0 fine.

But the "decorate_events" property does not seem to take effect, i.e, I receive none of the kafka topic/partition information, here's my input code.
I am using Logstash/ES 6.2.4

input {
    kafka {
        bootstrap_servers => "kafka.dev:9092"
        topics => ["retail.clog.1", "digital.clog.1"]
        client_id => "log"
        group_id => "log"
        auto_offset_reset => "earliest"
        consumer_threads => 4
        decorate_events => true
        heartbeat_interval_ms => "3000"
        max_poll_interval_ms => "1000"
        reconnect_backoff_ms => "3000"
        #queue_size => 500

Appreciate your help on this.


Are you looking at [@metadata][kafka] in logstash? It will not get written to elasticsearch.


Thanks for a quick response. Didn't quite understand, do I have to explicitly assign [@metadata][kafka] to a variable in logstash, if so, can you please say how.

Rest of the code is just this,

filter {
    json {
        source => "message"
output {
	elasticsearch {
		action => "index"
		index => "ms-logs-%{+YYYY.MM.dd}"
		hosts => ["elastic.dev"]


The kafka input puts the topic, partition etc. into fields under [@metadata][kafka]. The [@metadata] field on the event exists in logstash, but is not written to elasticsearch, so if you want to have that data in elasticsearch you would need to use mutate+copy to copy [@metadata][kafka] to another field.

Thank you so much, that helped.

filter {
        json {
                source => "message"
        mutate {
            add_field => {
                "kafka" => "%{[@metadata][kafka]}"


Is there a way to index the kafka detail as well?
I think because its an inner json it doesn't index in Elasticsearch.
It would be nice to filter out messages of a particular partition in Kibana, for example.


mutate { copy => { "[@metadata][kafka]" => "kafka" } }



I get this error now,

[2018-06-20T15:05:20,979][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch.
{:status=>400, :action=>["index", {:_id=>nil, :_index=>"ms-logs-2018.06.20", :_type=>"doc", :_routing=>nil}, #LogStash::Event:0x354fa377],
:response=>{"index"=>{"_index"=>"ms-logs-2018.06.20", "_type"=>"doc", "_id"=>"ua66HWQBIApBjZNHh1si", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception",
"reason"=>"failed to parse [kafka]", "caused_by"=>{"type"=>"illegal_state_exception",
"reason"=>"Can't get text on a START_OBJECT at 1:248"}}}}}


Look at the elasticseach log. That will have a clearer error message.


Here's the error from elastic logs

[2018-06-20T15:14:41,638][DEBUG][o.e.a.b.TransportShardBulkAction] [ms-logs-2018.06.20][2] failed to execute bulk item (index) BulkShardRequest [[ms-logs-2018.06.20][2]] containing [index {[ms-logs-2018.06.20][doc][CV_DHWQBFf7QtJ8OFSFl], source[{"app":"log","@timestamp":"2018-06-20T15:14:41.528Z","latency":"340","status":"true","ts":"2018-06-20 15:14:41,523","lcruid":"300ac294-1997-432f-a4e2-9da04a5c9","iid":"732a0577b","@version":"1","lccid":"e3ab50c1-c984-4e5d-b954-b657bef57cf","kafka":{"offset":11,"timestamp":1529507681524,"partition":1,"topic":"digital.log.1","key":"e0bc3806-9421-41c2-a70a-744d11d5d","consumer_group":"clog"},"type":"RESPONSE","msg":""Response from Service Layer"","crid":"arun1234","source":"DF"}]}]
org.elasticsearch.index.mapper.MapperParsingException: failed to parse [kafka]
at org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:302) ~[elasticsearch-6.2.4.jar:6.2.4]

1 Like

I suspect the problem is that you previously indexed documents where "kafka" was a string, and now it is an object. Are you in a position to "DELETE ms-logs-2018.06.20"? Or can you wait until tomorrow and see if it starts working at midnight UTC when it rolls to a new index?


That was precisely the issue, no problem deleting the index, I am just setting this up.
Thank you so much, you have been very helpful.


This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.