Metricbeat 6.X - Kafka module with SASL Auth didn't work

Hi,

we run a 5 node Kafka Cluster and try to collect some metric with metricbeat.

- module: kafka
  metricsets: ["partition" ]
  hosts: ["localhost:20160"]
  enabled: true
  period: 30s

  client_id: metricbeat
  retries: 3
  backoff: 250ms

  tags: ["kafka", "kafka-mdp"]

  # SASL authentication
  username: "user"
  password: "pwd"

I didn't get any events out of this configuration. The Debug logs didn't show an exact error:

./metricbeat run -e -v -d "*" -c metricbeat.yml
2018-02-13T08:05:50.016+0100  INFO  instance/beat.go:468  Home path: [/metricbeat-6.2.1-darwin-x86_64] Config path: [/metricbeat-6.2.1-darwin-x86_64] Data path: [/metricbeat-6.2.1-darwin-x86_64/data] Logs path: [/metricbeat-6.2.1-darwin-x86_64/logs]
2018-02-13T08:05:50.016+0100  DEBUG [beat]  instance/beat.go:495  Beat metadata path: /metricbeat-6.2.1-darwin-x86_64/data/meta.json
2018-02-13T08:05:50.017+0100  INFO  instance/beat.go:475  Beat UUID: fc21b1fe-a858-417b-b105-1341d6982de2
2018-02-13T08:05:50.017+0100  INFO  instance/beat.go:213  Setup Beat: metricbeat; Version: 6.2.1
2018-02-13T08:05:50.017+0100  DEBUG [beat]  instance/beat.go:230  Initializing output plugins
2018-02-13T08:05:50.018+0100  DEBUG [processors]  processors/processor.go:49  Processors:
2018-02-13T08:05:50.018+0100  INFO  pipeline/module.go:76 Beat name: s0070004.bonprix.de
2018-02-13T08:05:50.018+0100  INFO  instance/beat.go:301  metricbeat start running.
2018-02-13T08:05:50.018+0100  INFO  [monitoring]  log/log.go:97 Starting metrics logging every 30s
2018-02-13T08:05:50.019+0100  DEBUG [cfgfile] cfgfile/reload.go:95  Checking module configs from: /metricbeat-6.2.1-darwin-x86_64/modules.d/*.yml
2018-02-13T08:05:50.019+0100  DEBUG [cfgfile] cfgfile/cfgfile.go:143  Load config from file: /metricbeat-6.2.1-darwin-x86_64/modules.d/kafka-mdp.yml
2018-02-13T08:05:50.020+0100  DEBUG [cfgfile] cfgfile/reload.go:109 Number of module configs found: 1
2018-02-13T08:05:50.020+0100  DEBUG [processors]  processors/processor.go:49  Processors:
2018-02-13T08:05:50.020+0100  WARN  [cfgwarn] partition/partition.go:41 BETA: The kafka partition metricset is beta
2018-02-13T08:05:50.021+0100  INFO  cfgfile/reload.go:127 Config reloader started
2018-02-13T08:05:50.021+0100  DEBUG [cfgfile] cfgfile/reload.go:151 Scan for new config files
2018-02-13T08:05:50.021+0100  DEBUG [cfgfile] cfgfile/cfgfile.go:143  Load config from file: /metricbeat-6.2.1-darwin-x86_64/modules.d/kafka-mdp.yml
2018-02-13T08:05:50.022+0100  DEBUG [cfgfile] cfgfile/reload.go:170 Number of module configs found: 1
2018-02-13T08:05:50.022+0100  DEBUG [cfgfile] cfgfile/reload.go:198 Remove module from stoplist: 5619847301432343577
2018-02-13T08:05:50.022+0100  DEBUG [cfgfile] cfgfile/reload.go:203 Add module to startlist: 5619847301432343577
2018-02-13T08:05:50.022+0100  DEBUG [processors]  processors/processor.go:49  Processors:
2018-02-13T08:05:50.022+0100  WARN  [cfgwarn] partition/partition.go:41 BETA: The kafka partition metricset is beta
2018-02-13T08:05:50.022+0100  INFO  cfgfile/reload.go:258 Starting 1 runners ...
2018-02-13T08:05:50.022+0100  DEBUG [module]  module/wrapper.go:100 Starting Wrapper[name=kafka, len(metricSetWrappers)=1]
2018-02-13T08:05:50.022+0100  DEBUG [cfgfile] cfgfile/reload.go:265 New runner started: 5619847301432343577
2018-02-13T08:05:50.022+0100  INFO  cfgfile/reload.go:219 Loading of config files completed.
2018-02-13T08:05:50.022+0100  DEBUG [module]  module/wrapper.go:154 Starting metricSetWrapper[module=kafka, name=partition, host=localhost:20160]
2018-02-13T08:05:50.078+0100  INFO  kafka/log.go:36 kafka message: [[Successful SASL handshake]]
2018-02-13T08:05:50.105+0100  INFO  kafka/log.go:36 SASL authentication successful with broker [[localhost:20160 %!s(int=4) ]]:%!v(MISSING) - %!v(MISSING)

2018-02-13T08:05:50.105+0100  INFO  kafka/log.go:36 Connected to broker at [[localhost:20160]] (unregistered)

2018-02-13T08:05:50.134+0100  DEBUG [kafka] kafka/broker.go:348 Try to match broker to: localhost:20160
2018-02-13T08:05:50.134+0100  DEBUG [kafka] kafka/broker.go:109 found matching broker localhost:20160 with id 4
2018-02-13T08:05:50.161+0100  INFO  kafka/log.go:36 Closed connection to broker [[localhost:20160]]

2018-02-13T08:06:20.028+0100  INFO  [monitoring]  log/log.go:124  Non-zero metrics in the last 30s  {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":35,"time":35},"total":{"ticks":71,"time":72,"value":71},"user":{"ticks":36,"time":37}},"info":{"ephemeral_id":"504b890e-895f-4025-a669-7a0ab78a80a1","uptime":{"ms":30038}},"memstats":{"gc_next":4194304,"memory_alloc":2495264,"memory_total":4009744,"rss":21917696}},"libbeat":{"config":{"module":{"running":1,"starts":1},"reloads":1},"output":{"type":"console"},"pipeline":{"clients":2,"events":{"active":0}}},"system":{"cpu":{"cores":4},"load":{"1":1.6968,"15":1.7427,"5":1.6274,"norm":{"1":0.4242,"15":0.4357,"5":0.4069}}}}}}
2018-02-13T08:06:20.223+0100  INFO  kafka/log.go:36 kafka message: [[Successful SASL handshake]]
2018-02-13T08:06:20.249+0100  INFO  kafka/log.go:36 SASL authentication successful with broker [[localhost:20160 %!s(int=4) ]]:%!v(MISSING) - %!v(MISSING)

2018-02-13T08:06:20.249+0100  INFO  kafka/log.go:36 Connected to broker at [[localhost:20160]] (unregistered)

2018-02-13T08:06:20.279+0100  INFO  kafka/log.go:36 Closed connection to broker [[localhost:20160]]



^C2018-02-13T08:06:40.781+0100  DEBUG [service] service/service.go:33 Received sigterm/sigint, stopping

The user is marked as admin in kafka, so there shouldn't be a problem. Has anyone an working setup with kafka metrics and SASL auth?

These logs indicate the authentication did succeed:

2018-02-13T08:05:50.078+0100  INFO  kafka/log.go:36 kafka message: [[Successful SASL handshake]]
2018-02-13T08:05:50.105+0100  INFO  kafka/log.go:36 SASL authentication successful with broker

On first connect metricbeat tries to match the broker id with the actual hostname and port. For this metricbeat uses the kafka API GetMetadata call, to query the cluster state.

These message indicate the query was successfull (=> SASL authentication was successful):

2018-02-13T08:05:50.134+0100  DEBUG [kafka] kafka/broker.go:348 Try to match broker to: localhost:20160
2018-02-13T08:05:50.134+0100  DEBUG [kafka] kafka/broker.go:109 found matching broker localhost:20160 with id 4
2018-02-13T08:05:50.161+0100  INFO  kafka/log.go:36 Closed connection to broker [[localhost:20160]]

The metricbeat kafka module does not execute the normal connection bootstrapping. Instead one should have one metricbeat instance per kafka host (or configure all hosts with the module). The state in kafka is very distributed between nodes. You need to connect to the correct broker in order to collect metrics.

At this time we have one metricbeat per kafka node and connect via localhost tho the "local" instance. I can try to configura all cluster nodes an see if it makes any diffenrce.

I may add our configuration worked until we activated the SASL Auth.

I changed my configuration to access all cluster nodes with the metricbeat instnace. The Result is the same. I didn't get any metrics back. The Log looks as reported previously, execp that there is an entry per server.

it looks like that the broker isn't registered or metricbeat can't get the id correct:

		if b.id >= 0 {
			Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
		} else {
			Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
		}
		go withRecover(b.responseReceiver)

https://github.com/elastic/beats/blob/6.1/vendor/github.com/Shopify/sarama/broker.go lines 143+

but in the debug log it looks like metricbeat is can geather this information:

|2018-02-15T07:25:43.813+0100|DEBUG|[kafka]|kafka/broker.go:348|Try to match broker to: 10.107.176.67:20160|
|---|---|---|---|---|
|2018-02-15T07:25:43.813+0100|DEBUG|[kafka]|kafka/broker.go:109|found matching broker 10.107.176.67:20160 with id 2|
|2018-02-15T07:25:43.813+0100|DEBUG|[kafka]|kafka/broker.go:348|Try to match broker to: 10.107.176.70:20160|
|2018-02-15T07:25:43.813+0100|DEBUG|[kafka]|kafka/broker.go:109|found matching broker 10.107.176.70:20160 with id 5|
|2018-02-15T07:25:43.813+0100|DEBUG|[kafka]|kafka/broker.go:348|Try to match broker to: 10.107.176.68:20160|
|2018-02-15T07:25:43.813+0100|DEBUG|[kafka]|kafka/broker.go:109|found matching broker 10.107.176.68:20160 with id 3|
|2018-02-15T07:25:43.817+0100|DEBUG|[kafka]|kafka/broker.go:348|Try to match broker to: 10.107.176.66:20160|
|2018-02-15T07:25:43.817+0100|DEBUG|[kafka]|kafka/broker.go:348|Try to match broker to: 10.107.176.69:20160|
|2018-02-15T07:25:43.817+0100|DEBUG|[kafka]|kafka/broker.go:109|found matching broker 10.107.176.69:20160 with id 4|
|2018-02-15T07:25:43.817+0100|DEBUG|[kafka]|kafka/broker.go:109|found matching broker 10.107.176.66:20160 with id 1|

Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header) gives a "strange result in the logs:

2018-02-15T07:28:44.001+0100 INFO kafka/log.go:36 SASL authentication successful with broker [[10.107.176.67:20160 %!s(int=4) ]]:%!v(MISSING) - %!v(MISSING)

https://github.com/elastic/beats/blob/6.1/vendor/github.com/Shopify/sarama/broker.go line 642

the "strange result" is an error in the printf. See here: https://github.com/elastic/beats/pull/6371

With proper formatting the log should say:

2018-02-15T07:28:44.001+0100	INFO	kafka/log.go:36	SASL authentication successful with broker: 10.107.176.67:20160 - 4

Are the IP and broker ID correct?

The sarama library tries to register the broker for use by the procuder/consumer clients. Metricbeat doesn't make use of these.

The found matching broker debug log is emitted here: https://github.com/elastic/beats/blob/master/metricbeat/module/kafka/broker.go#L109

The fetch events for topic: debug message is missing: https://github.com/elastic/beats/blob/master/metricbeat/module/kafka/partition/partition.go#L102

This indicates either the topics list is empty or an error occured when reading the topics list from the broker. In case of an error when fetching the topics, you will have an 'error' event in elasticsearch.

Which kafka version are you testing with? How many topics do you have in your cluster and how are the topics partitions distributed between brokers?

oh, nice hint. The Topic list is indeet empty. I testet my changes on an empty cluster to prevent several rolling restarts of it. I create some topics an test it again. I come back if i had the chance to test this.

The IP and ID are correct.

Sometimes (after a restart) i have some Events with the following Content:

"error": {
  "message": "EOF"
}

We use the following versions:

  • metricbeat 6.1.1 and for some tests also 6.2.1
  • kafka 0.11.0.2
  • OS Oracle Enterprise Linux 7
  • docker 17.06.2-ol

yes, your point was very valid. The problem exists between keyboard an chair. I tested my changes with an empty cluster. After apply my changes to the production cluster i get metrics again.

Thanks for your support. And sorry that i wasted your time.

Best regards
Sebastian

EOF == end of file. We normally get this error if the connection has been closed by the remote while/before reading a reponse.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.