A panic of dumplicated kafka output

I get this panic as below,it happens when my filebeat kafka output module is reconnecting the kafka server as the Cloud server of kafka connection is not stable or server is restarted(cloud service,out of our control.Sad!):

panic: output type 'kafka' exists already

goroutine 5 [running]:
github.com/elastic/beats/libbeat/outputs.RegisterType(0xe8c100, 0x5, 0xecf2e8)
e:/work_space/Go/src/github.com/elastic/beats/libbeat/outputs/output_reg.go:30 +0x170
e:/work_space/Go/src/github.com/elastic/beats/libbeat/outputs/kafka/kafka.go:118 +0x11d
created by github.com/elastic/beats/libbeat/outputs/kafka.init.0
e:/work_space/Go/src/github.com/elastic/beats/libbeat/outputs/kafka/kafka.go:109 +0x170

I check the code and found the method

func RegisterType(name string, f Factory) {
if outputReg[name] != nil {
panic(fmt.Errorf("output type '%v' exists already", name))
outputReg[name] = f

I found that really strange after this panic happened,the filebeat still get one connection with the kafka server.But It's not working anymore. No message is sent since then.

I use tcp-drop kill this connection,however,it will reconnect and resume service(message will be able to send again)

It happens time to time,any suggestion?

What version of Filebeat are you using? Could you open a Github issue with this? If we get a panic it sounds a lot like a bug.

I build it myself. Git version as below
SHA-1: 6fe02a4dfc2de58490270a15affaa0aec0f5f8df

I may find a problem may lead to my issue... it's in the kafka lib sarama/broker.go
It does not set a readtimeout duraling SASL handshake.
I may block in the io.ReadFull,waiting for response.

yeah,I assume the problem is solve

after I set the timeout, I get the SASL auth timeout error instead of block in the waiting.

2018-08-10T17:02:57.876+0800 INFO kafka/log.go:36 Failed to read response while authenticating with SASL to broker [[xxx:xxx read tcp xxx:xxx->xxx:xxx: i/o timeout]]: %!s(MISSING)

but still u guys should check for the panic thing

The stack trace makes no sense to me. I checked out said commit, but Line 109 is empty and line 118 is a function definition, but no call. The stack-trace also indicates that the second call func1 is using a closure. We don't use a closure in the init function.

Have you made any modifications to kafka.go?

In go init is called before the beat actually starts up. The init function for each package is run exactly once. Unless you have a second output or modified the init function to register the output 'lazily' or whatever, then this panic can not occur.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.