Kafka input - Type missing

Hello,

We've got an existing ELK environment using filebeat primarily. Everything is working fine there, but I'm trying to add some data using the Kafka input plugin and running into an issue.

Logstash Version: 5.4.0
logstash-input-kafka Version: 5.1.11

Logstash input config is as follows:

input {
    kafka {
        type => "foo-type"
        bootstrap_servers => "<snip>"
        topics => ["<snip>"]
        auto_offset_reset => "latest"
        codec => "json"
        group_id => "<snip>"
        decorate_events => true
    }
}

Relevant output config:

output {
  elasticsearch {
    hosts => ["{{es_hosts}}"]
    manage_template => false
    index => "%{[@metadata][type]}-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][type]}"
  }
}

Problem: The index name and associated mapping (foo-type) is not being applied correctly, instead it is being named simply %{[@metadata][type]}-2019.08.06

Expected behavior: The index should be named foo-type-2019.08.06 and have the correct mapping applied.

Any idea what's going on here?

foo-type will be in [type], not [@metadata][type]

Is that specific to the Kafka input plugin? The output config specified above ([@metadata][type]) is what we currently use in production for ~5 other types and it works fine.

No, type is one of the options common across inputs and it sets the [type] field. I think you are relying on something configured in filebeat to add a metadata field.