A possible bug about Kafka output ErrBreakerOpen

I find an issue which is possibly a bug in kafka output of FIleBeat8.7:

libbeat/outputs/kafka/client.go:

func (r *msgRef) fail(msg *message, err error) {
	switch err {
	case sarama.ErrInvalidMessage:
		...
	case sarama.ErrMessageSizeTooLarge, sarama.ErrInvalidMessageSize:
		...
	case breaker.ErrBreakerOpen:
		// Add this message to the failed list, but don't overwrite r.err since
		// all the breaker error means is "there were a lot of other errors".
		r.failed = append(r.failed, msg.data)

	default:
		r.failed = append(r.failed, msg.data)
		if r.err == nil {
			// Don't overwrite an existing error. This way at tne end of the batch
			// we report the first error that we saw, rather than the last one.
			r.err = err
		}
	}
	r.dec()
}

func (r *msgRef) dec() {
	...
	err := r.err
	if err != nil {
		failed := len(r.failed)
		success := r.total - failed
		r.batch.RetryEvents(r.failed)

		stats.Failed(failed)
		if success > 0 {
			stats.Acked(success)
		}

		r.client.log.Debugf("Kafka publish failed with: %+v", err)
	} else {
		r.batch.ACK()
		stats.Acked(r.total)
	}
}

That means when got an ErrBreakerOpen error, the msgRef.err will not be set value. Then r.batch.ACK() will be executed. That will cause to: log records are not collected but file offset is already changed.

I show an example:
Firstly, prepare Kafka environment on localhost correctly and then config the filebeat.yml:

output.kafka:
  hosts: ["localhost:9093"]
  username: "xxxx"
  password: "yyyy"
  topic: 'test'
  sasl.mechanism: SCRAM-SHA-512
  # ssl.verification_mode: none

FileBeat works well, the log records are collected to Kafka.

Secondly, change the config:

output.kafka:
  hosts: ["localhost:9093"]
  username: "xxxx"
  password: "yyyy"
  topic: 'test'
  sasl.mechanism: SCRAM-SHA-512
  ssl.verification_mode: none

My Kafka 9093 port doesn't support SSL protocal, and I config an SSL setting on purpose to let it work incorrectly. Then it does't work well as we expected. The new log records are not collected to kafka.
But, we can see from data/registry/filebeat/log.json that the offset has gone forward... So I think it is possibly a bug.

I think msgRef.err should be set value when ErrBreakerOpen, As a possible solution:

func (r *msgRef) fail(msg *message, err error) {
	switch err {
	case sarama.ErrInvalidMessage:
		...
	case sarama.ErrMessageSizeTooLarge, sarama.ErrInvalidMessageSize:
		...
	case breaker.ErrBreakerOpen:
		// Add this message to the failed list, but don't overwrite r.err since
		// all the breaker error means is "there were a lot of other errors".
		r.failed = append(r.failed, msg.data)
        if r.err == nil {
			// Don't overwrite an existing error. This way at tne end of the batch
			// we report the first error that we saw, rather than the last one.
			r.err = err
		}

	default:
		r.failed = append(r.failed, msg.data)
		if r.err == nil {
			// Don't overwrite an existing error. This way at tne end of the batch
			// we report the first error that we saw, rather than the last one.
			r.err = err
		}
	}
	r.dec()
}

Welcome to our community, and thanks heaps for reporting this with such details! :smiley:

It'd be great if you could please raise this at Issues · elastic/beats · GitHub so it gets in front of the right people.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.