Kafka input plugin does not set @metadata fields

As you might expect, that was clearly the issue. I thought I'd post the optimized config file for others to look at, as well as my solution for remapping the camel-case index names to something Elasticsearch can actually index. It throws an error when the index name contains capital letters.

Here's my solution that solved both problems:

input {

  kafka {
    bootstrap_servers => <kafka_server_list_redacted>
    topics => [ "test", "controllerLog" ]
    consumer_threads => 6
    auto_offset_reset => "latest"
    auto_commit_interval_ms => "500"
    enable_auto_commit => true
    decorate_events => true
    codec => json {
      charset => "ISO-8859-1"
    }
  }

}

filter {
  ruby {
    code => "event.set('[@metadata][kafka][lc_topic]', event.get('[@metadata][kafka][topic]').split(/(?=[A-Z])/).map{|x| x.downcase }.join('_') )"
  }
}

output {
  elasticsearch {
    hosts => <elasticsearch_hosts_redacted>
    index => "%{[@metadata][kafka][lc_topic]}-%{+YYYY.MM.dd}"
  }
}
1 Like