Logstash 5.1.1 kafka input doesn't pick up existing messages on topic

I have the following logstash config with a kafka input

    input {
      kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["mytopic"]
      }
    }
    filter {
      json {
        source => "message"
      }
    }
    output {
      stdout {
        codec => rubydebug
      }
      elasticsearch {
        hosts => ["localhost:9200"]
        index => "my_index"
        codec => "json"
        document_id => "%{id}"
        doc_as_upsert => true
        action => "update"
      }
    }

The problem I am facing is that when I run the logstash it doesnt pick up old messages on that topic. I was under the impression that the first time logstash runs it will pick up all the messages on a topic that have not been consumed. I checked that this was a new topic and had messages in it which didnt get picked up by logstash when it started running. It does pick up messages which come in on the topic while it's running but not the ones that existed before its commencement. Am I missing something in configuration or is it a quirk of the input itself. The guarantee of messages is of utmost importance for my business needs.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.