Logstash Kafka Offset Handling

I'm witnessing some weird (Or maybe normal) behavior when using Logstash with the Kafka input.

Kafka v1.0
Logstash v6.1.1
ES v6.0

1 Logstash server
3 Kafka servers
1 Topic
50 Partitions


input {
    kafka {
        bootstrap_servers => "kafka-server1:9092,kafka-server2:9092,kafka-server3:9092"
        fetch_max_bytes => "104857600"
        group_id => "group-name"
        max_partition_fetch_bytes => "104857600"
        max_poll_records => "100000"
        topics => ["topic-name"]

output {
    elasticsearch {
        action => "index"
        document_id => "%{id}"
        hosts => ["es-server1:9200", "es-server2:9200", "es-server3:9200"]
        index => "index-name"
        manage_template => false
        resurrect_delay => 2
        retry_initial_interval => 1
        retry_max_interval => 4

When ES goes down, logstash fails the bulk request and retries it indefinitely. So far so good.
If I restart logstash while its waiting for a retry, those events that are in that soon-to-be-retried bulk request are not being refetched from Kafka when Logstash restarts.

I was assuming that because the bulk requests failed, Logstash didn't commit the offset for them, thus making them be re-fetched when Logstash starts again.

But what happens is that Logstash skips those events (Probably because the offset has been updated).

Is this intentional or a bug?
Could it be related to the auto committing of the offset?

