Kafka monitoring in kubernetes

Hi,

I need help in monitoring kafka cluster using metricbeat. When Kafka is on a standalone server, it works fine. When kafka is in kubernetes environment, how can I configure metricbeat to collect the metrics from kafka? is there any auto discovery available for this?

Hi @vamsikrishna_medeti,

metricbeat indeed has an autodiscovery feature, which works with k8s among others. You can either configure it using templating or use the hints-based autodiscovery.

Let us know if those work for you.

Autodiscovery works for k8s related objects. But does it work for kafka too?

we deployed kafka on kubernetes with 3 brokers, and I deployed metricbeat with kafka module enabled. like below:

apiVersion: v1
kind: ConfigMap
metadata:
  name: metricbeat-deployment-modules
  namespace: kafka
  labels:
    k8s-app: metricbeat
data:
  kafka.yml: |-
    # Kafka metrics collected using the Kafka protocol
    - module: kafka
      # metricsets: ["partition","consumergroup"]
      period: 10s
      hosts: ["pipeline-kafka:9092"]

Its throwing me below errors:

Error fetching data for metricset kafka.broker: error making http request: Post "http://pipeline-kafka:9092/jolokia/%3FignoreErrors=true&canonicalNaming=false": read tcp 10.5.7.136:34466->172.20.79.178:9092: read: connection reset by peer
Error fetching data for metricset kafka.producer: error making http request: Post "http://pipeline-kafka:9092/jolokia/%3FignoreErrors=true&canonicalNaming=false": read tcp 10.5.7.136:34470->172.20.79.178:9092: read: connection reset by peer
Error fetching data for metricset kafka.partition: error in connect: No advertised broker with address pipeline-kafka:9092 found

telnet to 9092 works fine:

[root@ip-10-5-7-136 metricbeat]# telnet pipeline-kafka 9092
Trying 172.20.79.178...
Connected to pipeline-kafka.
Escape character is '^]'.

Unfortunately connecting with telnet doesn't tell us much about the reason for the TCP connection reset. I'll try to get someone with more Kafka knowledge to take a look at this.

Pods are k8s related objects, so if you have Kafka deployed in Kubernetes, autodiscovery should work for you :slight_smile:

Instead of adding a static kafka configuration, try to add this provider to your autodiscover configuration in metricbeat.yml:

...
    metricbeat.autodiscover:
      providers:
        - type: kubernetes
          node: ${NODE_NAME}
          templates:
          - condition:
              contains:
                kubernetes.container.name: kafka
            config:
            - module: kafka
              metricsets: ["partition", "consumergroup"]
              hosts: ["${data.host}:9092"]
...

This configuration would instantiate a kafka module for each container whose name contains "kafka". You may need to adjust the condition for your case.
${data.host} will be replaced with the ip of the pod.

These errors may be caused by some kafka metricsets that use jolokia. I am not using them by now in the proposed configuration in purpouse as they require additional configuration.

This error uses to happen when Metricbeat can connect to the broker, but the broker is not advertising the address Metricbeat is using to connect. To investigate this if it continues happening, check the address kafka is advertising, this uses to be logged in kafka startup.

Thanks a lot @jsoriano. The autodiscovery is working. Its trying with IP of the port now.
But still not getting the metrics.
Now getting the below error:

Error fetching data for metricset kafka.consumergroup: error in connect: No advertised broker with address 10.5.7.207:9092 found
Error fetching data for metricset kafka.consumergroup: error in connect: Could not get cluster client for advertised broker with address 10.5.7.50:9092

Ok, this is the problem with the advertised addresses I was mentioning before. Could you check what addresses is Kafka advertising? This is usally logged on startup, not sure if there is other way of retrieving this information.

I am getting this error in kafka broker:

[2020-09-04 14:56:15,159] WARN [SocketServer brokerId=0] Unexpected error from /10.5.7.136; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1347375956 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:105)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
at kafka.network.Processor.poll(SocketServer.scala:913)
at kafka.network.Processor.run(SocketServer.scala:816)
at java.base/java.lang.Thread.run(Thread.java:834)

Does this mean metricbeat is tring to send 1.3GB of request?

This would be unexpected :grimacing: Is 10.5.7.136 the IP of a metricbeat pod?

Have you found what addresses is Kafka advertising?

yes, its the IP of the metricbeat pod. As it is using the host network, its the IP of the k8s Slave node.
I will get the kafka advertising details. May be the problem lies there.

Here are the listener properties configured for kafka in k8s:

listeners = PLAINTEXT://:9092
advertised.listeners = PLAINTEXT://:9092

We are able to create topics and publish messages from another pod with in the cluster. So listener config might not be the issue.

Still getting the same error (Error fetching data for metricset kafka.consumergroup: error in connect: No advertised broker with address).

metricbeat":{"kafka":{"consumergroup":{"events":3,"failures":3},"partition":{"events":3,"failures":3}}

Any other suggestions?

Thanks @jsoriano and @weltenwort.

Below is the config which is missing: (mentioning here so that it will be useful for others)

advertised.listeners=PLAINTEXT://${MY_POD_IP}:9092

I have set the MY_POD_IP as env variable like below:

        - name: MY_POD_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP

And in the metricbeat autodiscovery, I have set the scope to cluster.

metricbeat.autodiscover:
  providers:
    - type: kubernetes
      node: ${NODE_NAME}
      scope: cluster
      templates:
      - condition:
          equals:
            kubernetes.labels.app: kafka-broker
        config:
        - module: kafka
          metricsets: ["partition","consumergroup"]
          hosts: ["${data.host}:9092"]
1 Like