Kafka Logstash Plugin unavailable broker (swaps broker addresses for discovery?)

Hey everyone,

I'm pretty new to the stack and ran into some issues with trying to use a Kafka input. Here is my logstash.conf and logs from attaching to the container. In the logs, it successfully subscribes to the topics, then just starts failing with the unavailable broker error for the local address which is not what is set. Any help would be appreciated :slight_smile:

input {
  kafka {
    bootstrap_servers => "kafka11:9092,kafka12:9093"
    topics => ["messages", "jq"]
  }
}
logstash_1       | [2019-11-09T19:41:41,081][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version: 2.3.0
logstash_1       | [2019-11-09T19:41:41,081][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId: fc1aaa116b661c8a
logstash_1       | [2019-11-09T19:41:41,082][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka startTimeMs: 1573328501071
logstash_1       | [2019-11-09T19:41:41,196][INFO ][org.apache.kafka.clients.consumer.KafkaConsumer] [Consumer clientId=logstash-0, groupId=logstash] **Subscribed to topic(s): messages, jq**
logstash_1       | [2019-11-09T19:41:42,588][WARN ][logstash.outputs.elasticsearch] You are using a deprecated config setting "document_type" set in elasticsearch. Deprecated settings will continue to work, but are scheduled for removal from logstash in the future. Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature If you have any questions about this, please visit the #logstash channel on freenode irc. {:name=>"document_type", :plugin=><LogStash::Outputs::ElasticSearch bulk_path=>"/_monitoring/bulk?system_id=logstash&system_api_version=7&interval=1s", password=><password>, hosts=>[http://elasticsearch:9200], sniffing=>false, manage_template=>false, id=>"61217436ba74368dd34f5b7464eeea6e43d6b014cfc9e111093960789f46ecd1", user=>"elastic", document_type=>"%{[@metadata][document_type]}", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_342629a8-9da9-400f-939d-67e1a383589b", enable_metric=>true, charset=>"UTF-8">, workers=>1, template_name=>"logstash", template_overwrite=>false, doc_as_upsert=>false, script_type=>"inline", script_lang=>"painless", script_var_name=>"event", scripted_upsert=>false, retry_initial_interval=>2, retry_max_interval=>64, retry_on_conflict=>1, ilm_enabled=>"auto", ilm_rollover_alias=>"logstash", ilm_pattern=>"{now/d}-000001", ilm_policy=>"logstash-policy", action=>"index", ssl_certificate_verification=>true, sniffing_delay=>5, timeout=>60, pool_max=>1000, pool_max_per_route=>100, resurrect_delay=>5, validate_after_inactivity=>10000, http_compression=>false>}
logstash_1       | [2019-11-09T19:41:42,840][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elastic:xxxxxx@elasticsearch:9200/]}}
logstash_1       | [2019-11-09T19:41:42,918][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://elastic:xxxxxx@elasticsearch:9200/"}
logstash_1       | [2019-11-09T19:41:42,957][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Error while fetching metadata with correlation id 2 : {jq=LEADER_NOT_AVAILABLE, messages=LEADER_NOT_AVAILABLE}
logstash_1       | [2019-11-09T19:41:42,961][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>7}
logstash_1       | [2019-11-09T19:41:42,961][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
logstash_1       | [2019-11-09T19:41:43,047][INFO ][org.apache.kafka.clients.Metadata] [Consumer clientId=logstash-0, groupId=logstash] Cluster ID: d9PMTWr7Tc6deRqSpRZHSw
logstash_1       | [2019-11-09T19:41:43,079][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://elasticsearch:9200"]}
logstash_1       | [2019-11-09T19:41:43,177][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>".monitoring-logstash", "pipeline.workers"=>1, "pipeline.batch.size"=>2, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>2, :thread=>"#<Thread:0x6850ed28 run>"}
logstash_1       | [2019-11-09T19:41:43,197][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Connection to node 3 (/127.0.0.1:9094) could not be established. Broker may not be available.
logstash_1       | [2019-11-09T19:41:43,236][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Connection to node 2 (/127.0.0.1:9093) could not be established. Broker may not be available.
logstash_1       | [2019-11-09T19:41:43,310][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.
logstash_1       | [2019-11-09T19:41:43,311][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Connection to node 3 (/127.0.0.1:9094) could not be established. Broker may not be available.
logstash_1       | [2019-11-09T19:41:43,311][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Connection to node 2 (/127.0.0.1:9093) could not be established. Broker may not be available.
logstash_1       | [2019-11-09T19:41:43,373][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.
logstash_1       | [2019-11-09T19:41:43,430][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Connection to node 3 (/127.0.0.1:9094) could not be established. Broker may not be available.
logstash_1       | [2019-11-09T19:41:43,460][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Connection to node 2 (/127.0.0.1:9093) could not be established. Broker may not be available.
...
logstash_1       | [2019-11-09T19:41:43,713][INFO ][logstash.javapipeline    ] Pipeline started {"pipeline.id"=>".monitoring-logstash"}
logstash_1       | [2019-11-09T19:41:43,759][INFO ][logstash.agent           ] Pipelines running {:count=>2, :running_pipelines=>[:main, :".monitoring-logstash"], :non_running_pipelines=>[]}
logstash_1       | [2019-11-09T19:41:43,837][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Connection to node 2 (/127.0.0.1:9093) could not be established. Broker may not be available.
^CGracefully stopping... (press Ctrl+C again to force)

What is advertised.listeners set to?

 environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka11:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092

So this is from my docker-compose file, found online and have modified quite a bit. I see that the LISTENER_DOCKER_EXTERNAL ends up defaulting to the loopback address I assume? I just attempted swapping it to be the hostname setup but that leads to DNS resolution errors. Please let me know what else I can provide that may be of help. Here is an example of my compose file, I have just extended it with numerous other services.

Unfortunately I do not know docker, so I do not know how to get the correct IP address in there.

No worries, thank you for your time! This setup is mostly just for local testing anyway, hopefully I won't run into this when I move into a different stage.

I think I figured it out, my network was setup wrong so that was the first issue. Then changing the LISTENER_DOCKER_EXTERNAL to be the hostname of the container ended up working. Thanks for your time!