I find an issue which is possibly a bug in kafka output of FIleBeat8.7:
libbeat/outputs/kafka/client.go
:
func (r *msgRef) fail(msg *message, err error) {
switch err {
case sarama.ErrInvalidMessage:
...
case sarama.ErrMessageSizeTooLarge, sarama.ErrInvalidMessageSize:
...
case breaker.ErrBreakerOpen:
// Add this message to the failed list, but don't overwrite r.err since
// all the breaker error means is "there were a lot of other errors".
r.failed = append(r.failed, msg.data)
default:
r.failed = append(r.failed, msg.data)
if r.err == nil {
// Don't overwrite an existing error. This way at tne end of the batch
// we report the first error that we saw, rather than the last one.
r.err = err
}
}
r.dec()
}
func (r *msgRef) dec() {
...
err := r.err
if err != nil {
failed := len(r.failed)
success := r.total - failed
r.batch.RetryEvents(r.failed)
stats.Failed(failed)
if success > 0 {
stats.Acked(success)
}
r.client.log.Debugf("Kafka publish failed with: %+v", err)
} else {
r.batch.ACK()
stats.Acked(r.total)
}
}
That means when got an ErrBreakerOpen
error, the msgRef.err
will not be set value. Then r.batch.ACK()
will be executed. That will cause to: log records are not collected but file offset is already changed.
I show an example:
Firstly, prepare Kafka environment on localhost correctly and then config the filebeat.yml
:
output.kafka:
hosts: ["localhost:9093"]
username: "xxxx"
password: "yyyy"
topic: 'test'
sasl.mechanism: SCRAM-SHA-512
# ssl.verification_mode: none
FileBeat works well, the log records are collected to Kafka.
Secondly, change the config:
output.kafka:
hosts: ["localhost:9093"]
username: "xxxx"
password: "yyyy"
topic: 'test'
sasl.mechanism: SCRAM-SHA-512
ssl.verification_mode: none
My Kafka 9093 port doesn't support SSL protocal, and I config an SSL setting on purpose to let it work incorrectly. Then it does't work well as we expected. The new log records are not collected to kafka.
But, we can see from data/registry/filebeat/log.json
that the offset has gone forward... So I think it is possibly a bug.
I think msgRef.err
should be set value when ErrBreakerOpen
, As a possible solution:
func (r *msgRef) fail(msg *message, err error) {
switch err {
case sarama.ErrInvalidMessage:
...
case sarama.ErrMessageSizeTooLarge, sarama.ErrInvalidMessageSize:
...
case breaker.ErrBreakerOpen:
// Add this message to the failed list, but don't overwrite r.err since
// all the breaker error means is "there were a lot of other errors".
r.failed = append(r.failed, msg.data)
if r.err == nil {
// Don't overwrite an existing error. This way at tne end of the batch
// we report the first error that we saw, rather than the last one.
r.err = err
}
default:
r.failed = append(r.failed, msg.data)
if r.err == nil {
// Don't overwrite an existing error. This way at tne end of the batch
// we report the first error that we saw, rather than the last one.
r.err = err
}
}
r.dec()
}