I noticed following in my log when i could not connect to Logstash logs:
NFO pipeline/output.go:93 Attempting to reconnect to backoff(async()) with 3 reconnect attempt(s)
NFO [publisher] pipeline/retry.go:173 retryer: send wait signal to consumer
NFO [publisher] pipeline/retry.go:175 done
RROR pipeline/output.go:100 Failed to connect to backoff(async()): dial tcp connect: connection refused
NFO pipeline/output.go:93 Attempting to reconnect to backoff(async()) with 4 reconnect attempt(s)
NFO [publisher] pipeline/retry.go:196 retryer: send unwait-signal to consumer
NFO [publisher] pipeline/retry.go:198 done
NFO [publisher] pipeline/retry.go:173 retryer: send wait signal to consumer
NFO [publisher] pipeline/retry.go:175 done
RROR pipeline/output.go:100 Failed to connect to backoff(async()): dial connect: connection refused
NFO pipeline/output.go:93 Attempting to reconnect to backoff(async()) with 5 reconnect attempt(s)
NFO [publisher] pipeline/retry.go:196 retryer: send unwait-signal to consumer
NFO [publisher] pipeline/retry.go:198 done
NFO [publisher] pipeline/retry.go:173 retryer: send wait signal to consumer
NFO [publisher] pipeline/retry.go:175 done
//RROR pipeline/output.go:100 Failed to connect to backoff(async()): dial tcp connect: //connection refused
//NFO pipeline/output.go:93 Attempting to reconnect to backoff(async()) with 6 reconnect //attempt(s)
//NFO [publisher] pipeline/retry.go:196 retryer: send unwait-signal to consumer
//NFO [publisher] pipeline/retry.go:198 done
//NFO [publisher] pipeline/retry.go:173 retryer: send wait signal to consumer
//NFO [publisher] pipeline/retry.go:175 done
//
I am confused, that right after an unwait signal is set, a wait signal is set at once. Why did unwait and wait signal happen?
I don't know Go, so it is not clear to me.
I examined the source code, i saw that wait and unwait signals link to SigHint()
//func (c *eventConsumer) sigWait() {
// c.wait.Store(true)
// c.sigHint()
//}
//func (c *eventConsumer) sigUnWait() {
// c.wait.Store(false)
// c.sigHint()
//func (c *eventConsumer) sigHint() {
// send signal to unblock a consumer trying to publish events.
// With flags being set atomically, multiple signals can be compressed into one
// signal -> drop if queue is not empty
// select {
// case c.sig <- consumerSignal{tag: sigConsumerCheck}:
// default:
// }
I am not sure what sigConsumerCheck contains in my situation.
I debugged Filebeat and saw that the queue has ~2*max_bulk_setting (default 2048) (in my situation it was "events":{"active":4117}}), any more events doesn't add and it is ok. Events in the queue were kept until the connect was established.
So I didn't understand what happened when i had unwait/wait signal.