Logstash doesn't receive logs from kafka (filebeat transfer logs to kafka)

Hi,
I'm currently using:

  • filebeat 8.3.3 (installed on Windows),
  • Elasticsearch version 8.3.3
  • logstash 8.3.3
  • kafka 3.2.1
    elk, kafka are on 1 server (192.168.9.70)
    the Windows IP which installing filebeat is 192.168.9.34

About architecture, I firstly installed filebeat on Windows to get logs from there. then I want to use it to output log to kafka, and from kafka to logstash. Finally from logstash to elasticsearch.
Just like the image below

However, I'm having problem with not seeing logs from kafka/logstash (from Windows filebeat) on Kibana.
My filebeat config (filebeat.yml) on Windows:

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - E:\BackUpAPI\logs\fos_itrade\*
 
filebeat.config.modules:
   path: ${path.config}/modules.d/*.yml
   reload.enabled: false

setup.template.settings:
   index.number_of_shards: 1
  
setup.kibana:
  host: "http://192.168.9.70:5601"

output.kafka:
  hosts: ["192.168.9.70:9092"]
  enabled: true
  topic: 'logtest'
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

My logstash config (/etc/logstash/conf.d/logstash.conf) on CentOS server 192.168.9.70

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["myTopic","logtest"]
    codec => json
    }
}
filter {
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
   }
}
output {
  elasticsearch {
    hosts => ["localhost:9200"]
  }
}

When i do test kafka

  • ./kafka-console-producer.sh --topic logtest --bootstrap-server localhost:9092
  • ./kafka-console-producer.sh --topic myTopic --bootstrap-server localhost:9092
    I do see both of 2's output on Kibana portal.

When I do tcpdump -i any src port 9092 and dst host 192.168.9.34, I do see the there is coming from Windows to port 9092 (which is kafka's port)

I don't know what should I do, is there maybe something wrong with topics setup on Windows? or what i should do to get logs from filebeat on Windows to Elasticsearch through kafka and logstash?

please help!

Did you tried to consume from your kafka using kafka-console-consumer.sh to make sure that your logs are arriving at Kafka?

From what you shared it does not seem that Logstash has any issue to consume from Kafka, so you need to validate that filebea is correctly sending logs to Kafka.

Try to consume the messages from your topics using the consumer kafka shell script to see if there is any message sent from Filebeat.

If there is no messages from Filebeat, try to change the path to this:

"E:/BackUpAPI/logs/fos_itrade/*"

Is pretty common to have some issues on windows if you do not use forward slashes.

@leandrojmp Yup, i feel the same thing. I'm not sure if i config filebeat on Windows correctly, I have changed path as you said but still have no result. Here is the message.

Please don't post images of text the are really hard to view and help you with, they can not be searched, copied, debugged and some people can't even see them

it looks like you are harvesting files....

BUT I do see that is looks like all / most events are failing... I think your filebeat is not writing to Kafka...

Please no more screen shots of text... I had to Zoom it just to read it!

sorry about image, what I should do instead? @stephenb

Its ok, just cut-n-paste text and then select and format it with the format button </>

Back to your issue ... to me it looks like filebeat is failing to write to kafka.

Is there anyway we can troubleshoot the problem which is filebeat is failing to write to kafka, @stephenb ?

The first thing you should check is the communication from the filebeat machine to the machine running your kafka, a telnet on port 9092 would tell you if you your windows machine can connect to kafka port.

After that you need to check your kafka configuration, the listeners and advertised.listeners, you need to have a listener and an advertised listener on the private IP of your server.

Normally I use this configuration for listeners (on controlled environments where I do not need TLS/SSL).

# listener
advertised.listeners=INTERNAL://localhost:9093,EXTERNAL://private-ip:9092
listeners=INTERNAL://localhost:9093,EXTERNAL://private-ip:9092
listener.security.protocol.map = INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
inter.broker.listener.name = INTERNAL

And test the output

filebeat test output

I have tried edit the file /home/kafka/kafka_2.12-3.2.1/config/kraft/server.properties as you said, here is server.properties's config:

[root@srv-logcentral kraft]# cat server.properties
process.roles=broker,controller
node.id=1

controller.quorum.voters=1@localhost:9092
linter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER
listeners=PLAINTEXT://localhost:9093,CONTROLLER://:9092
advertised.listeners=PLAINTEXT://localhost:9093

listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT

num.network.threads=3
num.io.threads=8

socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

log.dirs=/tmp/kraft-combined-logs

num.partitions=1
num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

After config, I start the service, here is the log

[root@srv-logcentral bin]# ./kafka-server-start.sh /home/kafka/kafka_2.12-3.2.1/config/kraft/server.properties
...
[2022-08-18 16:08:30,956] INFO [GroupMetadataManager brokerId=1] Finished loading offsets and group metadata from __consumer_offsets-35 in 70 milliseconds for epoch 27, of which 68 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2022-08-18 16:08:30,957] INFO [GroupMetadataManager brokerId=1] Finished loading offsets and group metadata from __consumer_offsets-2 in 71 milliseconds for epoch 27, of which 70 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2022-08-18 16:08:58,109] ERROR Closing socket for 192.168.9.70:9092-192.168.9.34:61573-0 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Received request api key METADATA which is not enabled
[2022-08-18 16:08:58,110] ERROR Exception while processing request from 192.168.9.70:9092-192.168.9.34:61573-0 (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Received request api key METADATA which is not enabled
[2022-08-18 16:09:28,389] ERROR Closing socket for 192.168.9.70:9092-192.168.9.34:61574-1 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Received request api key METADATA which is not enabled
[2022-08-18 16:09:28,389] ERROR Exception while processing request from 192.168.9.70:9092-192.168.9.34:61574-1 (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Received request api key METADATA which is not enabled

When i start logstash, it said

[root@srv-logcentral bin]# ./logstash -f /etc/logstash/conf.d/logstash.conf
Using bundled JDK: /usr/share/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[WARN ] 2022-08-18 16:26:10.711 [main] runner - NOTICE: Running Logstash as superuser is not recommended and won't be allowed in the future. Set 'allow_superuser' to 'false' to avoid startup errors in future releases.
[WARN ] 2022-08-18 16:26:10.721 [main] runner - The use of JAVA_HOME has been deprecated. Logstash 8.0 and later ignores JAVA_HOME and uses the bundled JDK. Running Logstash with the bundled JDK is recommended. The bundled JDK has been verified to work with each specific version of Logstash, and generally provides best performance and reliability. If you have compelling reasons for using your own JDK (organizational-specific compliance requirements, for example), you can configure LS_JAVA_HOME to use that version instead.
[INFO ] 2022-08-18 16:26:10.722 [main] runner - Starting Logstash {"logstash.version"=>"8.3.3", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 11.0.15+10 on 11.0.15+10 +indy +jit [linux-x86_64]"}
[INFO ] 2022-08-18 16:26:10.723 [main] runner - JVM bootstrap flags: [-Xms1g, -Xmx1g, -XX:+UseConcMarkSweepGC, -XX:CMSInitiatingOccupancyFraction=75, -XX:+UseCMSInitiatingOccupancyOnly, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true, -Djruby.regexp.interruptible=true, -Djdk.io.File.enableADS=true, --add-opens=java.base/java.security=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED]
[WARN ] 2022-08-18 16:26:10.971 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
[INFO ] 2022-08-18 16:26:11.693 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[INFO ] 2022-08-18 16:26:12.040 [Converge PipelineAction::Create<main>] Reflections - Reflections took 53 ms to scan 1 urls, producing 124 keys and 408 values
[INFO ] 2022-08-18 16:26:12.200 [Converge PipelineAction::Create<main>] json - ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)
[INFO ] 2022-08-18 16:26:12.389 [Converge PipelineAction::Create<main>] javapipeline - Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
[INFO ] 2022-08-18 16:26:12.421 [[main]-pipeline-manager] elasticsearch - New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[INFO ] 2022-08-18 16:26:12.696 [[main]-pipeline-manager] elasticsearch - Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[WARN ] 2022-08-18 16:26:12.824 [[main]-pipeline-manager] elasticsearch - Restored connection to ES instance {:url=>"http://localhost:9200/"}
[INFO ] 2022-08-18 16:26:12.832 [[main]-pipeline-manager] elasticsearch - Elasticsearch version determined (8.3.3) {:es_version=>8}
[WARN ] 2022-08-18 16:26:12.833 [[main]-pipeline-manager] elasticsearch - Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>8}
[INFO ] 2022-08-18 16:26:12.862 [[main]-pipeline-manager] elasticsearch - Config is compliant with data streams. `data_stream => auto` resolved to `true`
[WARN ] 2022-08-18 16:26:12.864 [[main]-pipeline-manager] elasticsearch - Elasticsearch Output configured with `ecs_compatibility => v8`, which resolved to an UNRELEASED preview of version 8.0.0 of the Elastic Common Schema. Once ECS v8 and an updated release of this plugin are publicly available, you will need to update this plugin to resolve this warning.
[INFO ] 2022-08-18 16:26:12.870 [Ruby-0-Thread-10: :1] elasticsearch - Config is compliant with data streams. `data_stream => auto` resolved to `true`
[WARN ] 2022-08-18 16:26:12.886 [[main]-pipeline-manager] grok - ECS v8 support is a preview of the unreleased ECS v8, and uses the v1 patterns. When Version 8 of the Elastic Common Schema becomes available, this plugin will need to be updated
[INFO ] 2022-08-18 16:26:12.912 [Ruby-0-Thread-10: :1] elasticsearch - Using a default mapping template {:es_version=>8, :ecs_compatibility=>:v8}
[INFO ] 2022-08-18 16:26:13.005 [[main]-pipeline-manager] javapipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["/etc/logstash/conf.d/logstash.conf"], :thread=>"#<Thread:0x43d736af run>"}
[INFO ] 2022-08-18 16:26:13.502 [[main]-pipeline-manager] javapipeline - Pipeline Java execution initialization time {"seconds"=>0.49}
[INFO ] 2022-08-18 16:26:13.516 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"}
[INFO ] 2022-08-18 16:26:13.561 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[WARN ] 2022-08-18 16:26:13.576 [[main]<kafka] CommonClientConfigs - Configuration 'client.dns.lookup' with value 'default' is deprecated and will be removed in future version. Please use 'use_all_dns_ips' or another non-deprecated value.
[INFO ] 2022-08-18 16:26:13.577 [[main]<kafka] ConsumerConfig - ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.dns.lookup = default
        client.id = logstash-0
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = logstash
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 50
        reconnect.backoff.ms = 50
        request.timeout.ms = 40000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[INFO ] 2022-08-18 16:26:13.638 [[main]<kafka] AppInfoParser - Kafka version: 2.8.1
[INFO ] 2022-08-18 16:26:13.638 [[main]<kafka] AppInfoParser - Kafka commitId: 839b886f9b732b15
[INFO ] 2022-08-18 16:26:13.638 [[main]<kafka] AppInfoParser - Kafka startTimeMs: 1660814773635
[INFO ] 2022-08-18 16:26:13.643 [[main]<kafka] KafkaConsumer - [Consumer clientId=logstash-0, groupId=logstash] Subscribed to topic(s): myTopic, logtest
[INFO ] 2022-08-18 16:26:13.655 [kafka-input-worker-logstash-0] json - ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)
[ERROR] 2022-08-18 16:26:13.985 [kafka-input-worker-logstash-0] kafka - Unable to poll Kafka consumer {:kafka_error_message=>org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support METADATA, :cause=>nil}
[INFO ] 2022-08-18 16:26:14.989 [kafka-input-worker-logstash-0] AbstractCoordinator - [Consumer clientId=logstash-0, groupId=logstash] FindCoordinator request hit fatal exception
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR
[ERROR] 2022-08-18 16:26:14.992 [kafka-input-worker-logstash-0] kafka - Unable to poll Kafka consumer {:kafka_error_message=>org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR, :cause=>nil}
[ERROR] 2022-08-18 16:26:15.992 [kafka-input-worker-logstash-0] kafka - Unable to poll Kafka consumer {:kafka_error_message=>org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR, :cause=>nil}
[INFO ] 2022-08-18 16:26:16.993 [kafka-input-worker-logstash-0] AbstractCoordinator - [Consumer clientId=logstash-0, groupId=logstash] FindCoordinator request hit fatal exception
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR
[ERROR] 2022-08-18 16:26:16.994 [kafka-input-worker-logstash-0] kafka - Unable to poll Kafka consumer {:kafka_error_message=>org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR, :cause=>nil}
[ERROR] 2022-08-18 16:26:17.995 [kafka-input-worker-logstash-0] kafka - Unable to poll Kafka consumer {:kafka_error_message=>org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR, :cause=>nil}
[INFO ] 2022-08-18 16:26:18.995 [kafka-input-worker-logstash-0] AbstractCoordinator - [Consumer clientId=logstash-0, groupId=logstash] FindCoordinator request hit fatal exception
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR
[ERROR] 2022-08-18 16:26:18.995 [kafka-input-worker-logstash-0] kafka - Unable to poll Kafka consumer {:kafka_error_message=>org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR, :cause=>nil}
[ERROR] 2022-08-18 16:26:19.996 [kafka-input-worker-logstash-0] kafka - Unable to poll Kafka consumer {:kafka_error_message=>org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR, :cause=>nil}
[INFO ] 2022-08-18 16:26:20.997 [kafka-input-worker-logstash-0] AbstractCoordinator - [Consumer clientId=logstash-0, groupId=logstash] FindCoordinator request hit fatal exception
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR
[ERROR] 2022-08-18 16:26:20.997 [kafka-input-worker-logstash-0] kafka - Unable to poll Kafka consumer {:kafka_error_message=>org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR, :cause=>nil}
[ERROR] 2022-08-18 16:26:21.998 [kafka-input-worker-logstash-0] kafka - Unable to poll Kafka consumer {:kafka_error_message=>org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR, :cause=>nil}
[INFO ] 2022-08-18 16:26:22.999 [kafka-input-worker-logstash-0] AbstractCoordinator - [Consumer clientId=logstash-0, groupId=logstash] FindCoordinator request hit fatal exception
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR
[ERROR] 2022-08-18 16:26:23.000 [kafka-input-worker-logstash-0] kafka - Unable to poll Kafka consumer {:kafka_error_message=>org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR, :cause=>nil}
[ERROR] 2022-08-18 16:26:24.001 [kafka-input-worker-logstash-0] kafka - Unable to poll Kafka consumer {:kafka_error_message=>org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR, :cause=>nil}
[INFO ] 2022-08-18 16:26:25.001 [kafka-input-worker-logstash-0] AbstractCoordinator - [Consumer clientId=logstash-0, groupId=logstash] FindCoordinator request hit fatal exception
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR
[ERROR] 2022-08-18 16:26:25.002 [kafka-input-worker-logstash-0] kafka - Unable to poll Kafka consumer {:kafka_error_message=>org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support FIND_COORDINATOR, :cause=>nil}

On my Windows, when i do .\filebeat.exe test output, it said

Kafka: 192.168.9.70:9092...
  parse host... OK
  dns lookup... OK
  addresses: 192.168.9.70
  dial up...OK

When i run .\filebeat.exe -e -c filebeat.yml, it said
message: kafka (topic=logtest): kafka: couldnt' fetch broker metadata (check that your client and broker are using the same on settings)..

maybe filebeat is ok but the kafka is not started properly now @leandrojmp

Please help :frowning:

I'm not sure how we could help further, your issue seems to be with Kafka.

I shared the Kafka config I use and works in my case, but you didn't change your advertised listeners.

I'm not an specialist on Kafka, but the advertised listener is the ip address and port that your producer or consumers will talk, if it is an external producer or consumer you need to listen on an ip address that you can access outside your server, you are using only localhost.

Try to add the CONTROLLER://:9092 to your advertised.listeners and see if it works.

I did change it but it said,

[root@srv-logcentral bin]# sudo ./kafka-server-start.sh /home/kafka/kafka_2.12-3.2.1/config/kraft/server.properties
[2022-08-19 09:34:25,643] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2022-08-19 09:34:26,067] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2022-08-19 09:34:26,102] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: The advertised.listeners config must not contain KRaft controller listeners from controller.listener.names when process.roles contains the broker role because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers.
        at scala.Predef$.require(Predef.scala:281)
        at kafka.server.KafkaConfig.validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker$1(KafkaConfig.scala:2048)
        at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:2114)
        at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1997)
        at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1471)
        at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:1394)
        at kafka.Kafka$.buildServer(Kafka.scala:67)
        at kafka.Kafka$.main(Kafka.scala:87)
        at kafka.Kafka.main(Kafka.scala)

That's why i removed it, but still didn't work

I have a questions on setting port for controller / plaintext, on the original github it said

listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT

However in your configuration, you use

listeners=INTERNAL://localhost:9093,EXTERNAL://private-ip:9092
inter.broker.listener.name = INTERNAL

So I wonder what Port i should use to make it right!

I'm still using Kafka with Zookeeper, from what you shared you do not have Zookeeper and are using Kafka with KRaft, the new way that Kafka uses to talk with brokers, so it will not be the same.

But this does not matter, you can just create another listener and expose it in the advertised listeners.

Just add a new listener in the listeners, advertised.listeners and listener.security.protocol.map configuration, something like EXTERNAL://:9094

Something like this:

listeners=PLAINTEXT://localhost:9093,CONTROLLER://:9092,EXTERNAL://:9094
advertised.listeners=PLAINTEXT://localhost:9093,EXTERNAL://:9094

listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT

Then configure your filebeat to use the port 9094.

If this does not work I'm not sure what you could do because the issue seems to be with your Kafka configuration.

2 Likes

Thanks, I finally received logs with new listener created "EXTERNAL://:9094",

process.roles=broker,controller
node.id=1
controller.quorum.voters=1@localhost:9093
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER
listeners=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
advertised.listeners=PLAINTEXT://192.168.9.70:9092,EXTERNAL://:9094
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT

i still keep port 9092 on my filebeat on Windows, but be able to listen incoming logs on port 9092 and 9094 on Kafka server
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic logtest --from-beginning .
I appreciate your help a lot! @leandrojmp

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.