Logstash-output-kafka stops producing data without error if topic is not valid

When using kafka output plugin, if your topic is extracted using a grok filter and if for some reason the filter fails (grokparsefailure), then the output plugin is blocked and no explicit error is produced.

Steps to reproduce:

  1. Get a kafka broker running on localhost:9092

  2. Create a sample.log file containing the following lines:

Matching test: 1
Matching test: 2
Not matching test: 3
Matching test: 4
Matching test: 5

  1. Use the following logstash configuration to push the data to the local kafka broker:

input {
file {
path => "/home/user/sample.log"
start_position => beginning
}
}

filter {
grok {
match => { "message" => "^Matching %{WORD:topic}: %{NUMBER:line}.*" }
}
}

output {
stdout { codec => rubydebug }
kafka {
batch_size => 1
bootstrap_servers => "127.0.0.1:9092"
topic_id => "%{topic}"
}
}

  1. Read the content of the topic using kafka console consumer:
    $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

Observed result:

Only the first two lines are pushed to the topic:

{"message":"Matching test: 1","@version":"1","@timestamp":"2016-06-28T13:23:14.276Z","path":"/home/user/sample.log","host":"fabien1.dev","topic":"test","line":"1"}
{"message":"Matching test: 2","@version":"1","@timestamp":"2016-06-28T13:23:14.632Z","path":"/home/user/sample.log","host":"fabien1.dev","topic":"test","line":"2"}

The first missing event is:

{
"message" => "Not matching test: 3",
"@version" => "1",
"@timestamp" => "2016-06-28T13:30:37.454Z",
"path" => "/home/user/sample.log",
"host" => "host1.dev",
"tags" => [
[0] "_grokparsefailure"
]
}

Logstash has read, according to the stdout output, all the lines (the last ones are stuck in some buffers) and is looping pushing some data containing "%{topic}" to kafka. Nothing else will happen, nothing in the logs

Logstash: 2.1.1 to 2.3.3
Kafka: 2.10-0.10.0.0
OS: Linux debian wheezy

For me this is an issue: logstash should at least detect it and produce an error message.
A simple fix in the configuration is to check that the parsing for the topic is not failing.