Unable to re-read a kafka topic (after resetting the offset)

Hello everyone and thanks in advance,

I have been using the kafka plugin for some time. It works great reading messages from two topics I have around, but now I added a third one (alarms) and it never had the messages that were written there.

BTW, my configuration is something like this

input {
    kafka {
      bootstrap_servers => "kafka01:9092,kafka02:9092,kafka03:9092"
      topics => ['topic1','topic2','alarms']
      type   => 'app-kafka'
      codec  => 'json'
      decorate_events => true
      auto_offset_reset => 'earliest'
    }
}

For topic1 and topic2 everything works flawlessly, but 'alarms' is not being consumed. I check the brokers and find the following:

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
logstash        alarms          2          2               2               0               logstash-0-bb957dc2-ab17-464b-9f4d-1a9fc0c85811 /192.168.96.64  logstash-0
logstash        alarms          0          1               1               0               logstash-0-bb957dc2-ab17-464b-9f4d-1a9fc0c85811 /192.168.96.64  logstash-0
logstash        alarms          1          3               3               0               logstash-0-bb957dc2-ab17-464b-9f4d-1a9fc0c85811 /192.168.96.64  logstash-0

Which sounds good so i will just reset the log offset, so I stop logstash and set the offset to 0 (in the kafka server)

/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092,kafka02:9092,kafka03:9092 --group logstash --topic alarms --reset-offsets --to-offset 0 --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
logstash                       alarms                         0          0
logstash                       alarms                         2          0
logstash                       alarms                         1          0

So theorically the offsets for the logstash consumer are set to 0. Now I restart the logstash service...

/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092,kafka02:9092,kafka03:9092 --group logstash --describe | grep -e ^GROUP -e alarms
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
logstash        alarms          2          2               2               0               logstash-0-c4d80993-bc27-427f-b5d1-c76ab8e6e30a /192.168.96.64  logstash-0
logstash        alarms          0          1               1               0               logstash-0-c4d80993-bc27-427f-b5d1-c76ab8e6e30a /192.168.96.64  logstash-0
logstash        alarms          1          3               3               0               logstash-0-c4d80993-bc27-427f-b5d1-c76ab8e6e30a /192.168.96.64  logstash-0

But I didn't get any single message from the topic alarms on elasticsearch. I'm a bit confused here,
I suppose there is there something I'm missing :frowning:

Any help will be welcome, thank you!!!!

Hello all again,

We just tried to delete the "alarms" topic and creating it again. I can see that it starts consuming again but I don't get any message from the topic on elasticsearch...

Thank you!

[INFO ] 2019-11-28 08:58:50.714 [Ruby-0-Thread-34: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-kafka-9.0.1/lib/logstash/inputs/kafka.rb:244] Fetcher - [Consumer clientId=logstash-0, groupId=logstash] Fetch offset 3 is out of range for partition alarms-1, resetting offset
[INFO ] 2019-11-28 08:58:50.714 [Ruby-0-Thread-34: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-kafka-9.0.1/lib/logstash/inputs/kafka.rb:244] Fetcher - [Consumer clientId=logstash-0, groupId=logstash] Fetch offset 2 is out of range for partition alarms-2, resetting offset
[INFO ] 2019-11-28 08:58:50.745 [Ruby-0-Thread-34: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-kafka-9.0.1/lib/logstash/inputs/kafka.rb:244] Fetcher - [Consumer clientId=logstash-0, groupId=logstash] Fetch offset 1 is out of range for partition alarms-0, resetting offset
[INFO ] 2019-11-28 08:58:50.748 [Ruby-0-Thread-34: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-kafka-9.0.1/lib/logstash/inputs/kafka.rb:244] Fetcher - [Consumer clientId=logstash-0, groupId=logstash] Resetting offset for partition alarms-2 to offset 0.
[INFO ] 2019-11-28 08:58:50.749 [Ruby-0-Thread-34: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-kafka-9.0.1/lib/logstash/inputs/kafka.rb:244] Fetcher - [Consumer clientId=logstash-0, groupId=logstash] Resetting offset for partition alarms-1 to offset 0.
[INFO ] 2019-11-28 08:58:50.749 [Ruby-0-Thread-34: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-kafka-9.0.1/lib/logstash/inputs/kafka.rb:244] Fetcher - [Consumer clientId=logstash-0, groupId=logstash] Resetting offset for partition alarms-0 to offset 0.