Testers wanted: *beat integration with Apache NiFi


(Trixpan) #1

Hi there,

For those using beats but also using Hadoop or stack I have baked an experimental listener that allows Apache NiFi (which happens to be the base of Hortonworks Dataflow) to accept connections from libbeats agents.

The rationale for that is to allow for a drop in replacement for simpler pipelines (e.g. filebeat-> pipeline -> HDFS / Kafka / whatever) while providing strong delivery guarantee, persistent queues, data provenance,etc (refer to NiFi's documentation provides insight over its data provenance and guarantees).

The code is still experimental and I would strongly appreciate if people could assist by testing it against a more diverse set of environments.

Code is available here:


Beat protocol
(Steffen Siering) #2

Didn't read the code, but this makes me wonder about NiFi internal queues blocking:

the processor performs partial ACKs, more precisely it replies to EVERY sequence number with an ACK, completely ignoring the window size. This may change prior to merging.

Partial ACKs are fine. The effect on beats is, if connection is dropped due to timeout, the subset of events already been ACKed will not be resend. This not-resending is only true for the sending pipeline though. As filebeat operates in batches, the ACK will not be forwarded to the filebeat registry. That is, if filebeat is restarted in middle of failing connection, the complete batch is retransferred.

Partial ACK (even ACK 0) has a second function too. It is a heartbeat signal as well. Beats have a default connection idle-timeout of 30 seconds. In case of logstash pipeline being blocked, the beats-input-plugin sends an ACK every 5 seconds. This prevents beats from reconnecting and sending the batch once again.

e.g. what happens if NiFi queue is currently stopped? Without ACK, beats will eventually timeout the connection as no ACK has been received and resend the complete batch (unnecessarily using network resources + potentially duplicating events due to resends).


(Trixpan) #3

Didn't read the code, but this makes me wonder about NiFi internal queues blocking:

Thank goodness otherwise you may not have replied. :blush:

Partial ACKs are fine. The effect on beats is, if connection is dropped due to timeout, the subset of events already been ACKed will not be resend. This not-resending is only true for the sending pipeline though. As filebeat operates in batches, the ACK will not be forwarded to the filebeat registry. That is, if filebeat is restarted in middle of failing connection, the complete batch is retransferred.

Interesting. I wasn't aware about the ACKing being restricted to the sending side only, but I guess I rather receive at-least one than missing the data.

Also, to certain extent I suspect this can be described as beyond the scope of the protocol, being a particular design choice for filebeat, would you agree?

Partial ACK (even ACK 0) has a second function too. It is a heartbeat signal as well. Beats have a default connection idle-timeout of 30 seconds. In case of logstash pipeline being blocked, the beats-input-plugin sends an ACK every 5 seconds. This prevents beats from reconnecting and sending the batch once again.

I will have to double check but shouldn't be impossible to mimic beats-input-plugin.

e.g. what happens if NiFi queue is currently stopped? Without ACK, beats will eventually timeout the connection as no ACK has been received and resend the complete batch (unnecessarily using network resources + potentially duplicating events due to resends).

If I understand your example correctly impact should in theory be limited.

The processor (the NiFi terminology for this "plugin") should ACK the event as soon data is persisted into disk (i.e. a new flowfile is created).

In theory, the downstream pipeline could grind to a halt without prejudice to the sending agent, instead, the processor should still make sure events are queued in accordance to "queue" settings (i.e. kept until expired due to age or until exceeding the queue capacity).

So, in the hypothetical situation where a NiFi downstream flow is totally stopped - e.g. HDFS cluster is offline - the queue (or connection in NiFi lingo) should continue to accept new events until it grows too large (# of bytes) or has too many events. Only then ACKs should no longer be ACKed.

Did that make sense?


(Steffen Siering) #4

Interesting. I wasn't aware about the ACKing being restricted to the sending side only, but I guess I rather receive at-least one than missing the data.

ACK serves multiple purposes: (1) heartbeat signal (2) partial ACK so only subset is resend in case of failure 3) signal a batch of events being fully processed. The points (1) and (2) are optional. E.g. you can send a ACK 0 as heartbeat signal only or ACK n whenever you want to signal progress. Point (3) is required. It's up to the client to make use of (2), but beats try to do so, in order to reduce amount of events to be send in case of errors (e.g. network failure in middle of processing). That is, the input MUST guarantee and event to be processed when returning a partial ACK, as partial ACKed events MUST NOT be resend. While Nifi + your implementation seems to follow these protocol requirements, it's a good thing to be aware of it.

for example See go-lang based implementation, or logstash-input-beats java implementation).

The function of the heartbeat signal is to guarantee we do not start overloading an unresponsive systems (which has been a problem in logstash-forwarder, getting caught in a timeout-resend-overload-sink-even-more-timeout-again-cycle). But the heartbeat is 'optional' and older ruby based logstash implementations don't support the time-based heartbeat signal either (one of the reasons for the java - netty based - rewrite).
NiFi storing persistent events in flow-files, should give you a pretty large queue so to say, but the queue is not infinite.

While the heartbeat is optional I'd recommend to have it enabled, as this might save resources (in beats, plus network as well as disk) in the case of the input queue growing too large, as in the end queues will always be limited by availability of physical resources.

For the beat itself it doesn't make a big difference though. With or without the heartbeat signal, in case of failure the beat will have to deal with back-pressure (beat itself is decoupled from output by internal event publisher queue). Some beats do by dropping events and other beats will block on back-pressure. The heartbeat signal is only useful to signal back-pressure occurring in a healthy sub-system (no need to trigger failure recovery).


(system) #5

This topic was automatically closed after 21 days. New replies are no longer allowed.