I'm have a logging pipeline that looks something like the following:
Logstash/Rsyslog/etc (submission) -> Kafka -> Logstash (enrichment) -> Elasticsearch
Metrics tells me that the by-far slowest part of my pipeline in the ingestion into Elasticsearch, and I've been trying to optimize that of late; it seems to top out at about 24k documents/sec.
One of the workloads is particularly large, yesterday we had an issue which meant Elasticsearch was down for 1.5 hours, and it took 9 hours to catch up, which shows how little extra capacity we have.
That means that for 9 hours the data in our hot tier was not the latest data. Obviously for things like threat-hunting the latest data is more valuable (fresher is better, like milk, compared with wine) [this milk/wine contrast you might have heard of from discussions around quality-of-service in network delivery]
I would like to implement some sort of priority queue, where most recent data is highest priority.
So here is my desired behaviour:
- when logstash detects that it is about to send 'fresh' event to Elasticsearch, send it.
- when logstash detects that it is about to send a delayed event to Elasticsearch (playing catchup), send it when there is free capacity.
- when logstash detects that it is about to send some data to a date-range that would exist outside of the 'hot' or 'warm' tier, or beyond some threshold, drop it to protect ourselves from pathological 'first shipping' behaviour.
- ... but there does need to be some notion / awareness of when historical logs are [deliberately] being fed back into Elasticsearch ... but we can ignore that for now.
One of the things I need to do is to migrate to using multiple logstash pipelines. I see that some of what I want to do can be catered to through patterns such as the 'output isolator pattern', however, I never want the 'fresh' logs to be blocked if the 'delayed' queue happens to get full.
What I'm thinking of doing is the following:
- input for the logstash enrichment tier (that feeds Elasticsearch) comes from Kafka [which is how things currently stand]
- introduce some filter logic to tag logs as 'delayed' (or 'obsolete') if @timestamp is older than some threshold
- if 'obsolete' in [tags] ... then drop it
- if 'delayed' in [tags] ... then output to a Kafka topic that has size and age retention policies [representing how much of an effort we spend on ingesting delayed logs, which could be dropped by Kafka if too large / too old] ...
- else deliver to Elasticsearch
- a separate logstash pipeline would read from the 'delayed' kafka topic and send to Elasticsearch at a lower priority
- a separate logstash pipeline would read from the 'backfed' kafka topic and send to Elasticsearch at a higher priority than 'delayed' (because its use is related to forensics and would be pre-filtered anyway)
Similar facilities could be contrived for doing things like rate-limiting (populate a memcache instance or such with rate information and use that to drive circuit-breaking decisions)
This would appear to help resolve the head-of-line blocking issue, at the cost of storage space for the additional kafka topic. But it doesn't say anything in particular about how to enforce priority; how can I ensure that 'delayed' logs only get fed into Elasticsearch when there is spare capacity?
I had thought (apparently wrongly) that pipelines had some priority mechanism, but maybe I should just have more workers for the fresh pipeline and fewer workers for the delayed pipeline.
Does this seem like a sensible configuration?