Treating Logs as Milk, not Wine (the art of playing catchup)

I'm have a logging pipeline that looks something like the following:

Logstash/Rsyslog/etc (submission) -> Kafka -> Logstash (enrichment) -> Elasticsearch

Metrics tells me that the by-far slowest part of my pipeline in the ingestion into Elasticsearch, and I've been trying to optimize that of late; it seems to top out at about 24k documents/sec.

One of the workloads is particularly large, yesterday we had an issue which meant Elasticsearch was down for 1.5 hours, and it took 9 hours to catch up, which shows how little extra capacity we have.

That means that for 9 hours the data in our hot tier was not the latest data. Obviously for things like threat-hunting the latest data is more valuable (fresher is better, like milk, compared with wine) [this milk/wine contrast you might have heard of from discussions around quality-of-service in network delivery]

I would like to implement some sort of priority queue, where most recent data is highest priority.

So here is my desired behaviour:

  • when logstash detects that it is about to send 'fresh' event to Elasticsearch, send it.
  • when logstash detects that it is about to send a delayed event to Elasticsearch (playing catchup), send it when there is free capacity.
  • when logstash detects that it is about to send some data to a date-range that would exist outside of the 'hot' or 'warm' tier, or beyond some threshold, drop it to protect ourselves from pathological 'first shipping' behaviour.
  • ... but there does need to be some notion / awareness of when historical logs are [deliberately] being fed back into Elasticsearch ... but we can ignore that for now.

One of the things I need to do is to migrate to using multiple logstash pipelines. I see that some of what I want to do can be catered to through patterns such as the 'output isolator pattern', however, I never want the 'fresh' logs to be blocked if the 'delayed' queue happens to get full.

What I'm thinking of doing is the following:

  • input for the logstash enrichment tier (that feeds Elasticsearch) comes from Kafka [which is how things currently stand]
  • introduce some filter logic to tag logs as 'delayed' (or 'obsolete') if @timestamp is older than some threshold
  • if 'obsolete' in [tags] ... then drop it
  • if 'delayed' in [tags] ... then output to a Kafka topic that has size and age retention policies [representing how much of an effort we spend on ingesting delayed logs, which could be dropped by Kafka if too large / too old] ...
  • else deliver to Elasticsearch
  • a separate logstash pipeline would read from the 'delayed' kafka topic and send to Elasticsearch at a lower priority
  • a separate logstash pipeline would read from the 'backfed' kafka topic and send to Elasticsearch at a higher priority than 'delayed' (because its use is related to forensics and would be pre-filtered anyway)

Similar facilities could be contrived for doing things like rate-limiting (populate a memcache instance or such with rate information and use that to drive circuit-breaking decisions)

This would appear to help resolve the head-of-line blocking issue, at the cost of storage space for the additional kafka topic. But it doesn't say anything in particular about how to enforce priority; how can I ensure that 'delayed' logs only get fed into Elasticsearch when there is spare capacity?

I had thought (apparently wrongly) that pipelines had some priority mechanism, but maybe I should just have more workers for the fresh pipeline and fewer workers for the delayed pipeline.

Does this seem like a sensible configuration?

Have you looked at where the limitation is in Elasticsearch?

I have been following:

Specifically, I have:

  • increased the refresh interval (even to as much as 5 minutes, though normally I set it to 30s in the template)
  • replicas = 0
  • I tried setting indices.memory.index_buffer_size to 20%, but that doesn't appear to have made much difference.
  • The mapping for my most challenging workload is fairly aggresively optimized, I can provide it if you like.
  • Each ES JVM has 16GB, though it will be competing, which will be impacting performance.
  • They are on physical servers with SSD storage.
  • Logstash has been set to use sniffing for its elasticsearch output so as to spread the coordination overhead; although I think I'll change that to use a hard-coded set of just the hot nodes.

Hah, although I do some glaring unexpectancies, namely the config that was present for mlockall has since disappeared (we changed from cfengine to puppet, that's likely the reason), and swap was enabled too), so I'll look forward to patching that up.

Actually, locked memory is set, its just not set by PAM. It's set by Systemd, and I can verify that it is set:

$ systemctl status elasticsearch-hot.service 
● elasticsearch-hot.service - Elasticsearch (hot)
   Loaded: loaded (/etc/systemd/system/elasticsearch-hot.service; enabled; vendor preset: disabled)
   Active: active (running) since Mon 2021-08-02 22:46:32 NZST; 12h ago
     Docs: http://www.elastic.co
 Main PID: 25139 (java)
...

$ cat /proc/25139/limits 
Limit                     Soft Limit           Hard Limit           Units     
Max cpu time              unlimited            unlimited            seconds   
Max file size             unlimited            unlimited            bytes     
Max data size             unlimited            unlimited            bytes     
Max stack size            8388608              unlimited            bytes     
Max core file size        0                    unlimited            bytes     
Max resident set          unlimited            unlimited            bytes     
Max processes             4096                 4096                 processes 
Max open files            65535                65535                files     
Max locked memory         unlimited            unlimited            bytes     
Max address space         unlimited            unlimited            bytes     
Max file locks            unlimited            unlimited            locks     
Max pending signals       514542               514542               signals   
Max msgqueue size         819200               819200               bytes     
Max nice priority         0                    0                    
Max realtime priority     0                    0                    
Max realtime timeout      unlimited            unlimited            us

What is the output from the _cluster/stats?pretty&human API?

Thank you for helping.

Well the first thing you can do is reduce your shard count. You're looking at an average size of around 0.7GB, which is waste of heap and not doing you any favours.
Are you using ILM?

7.11.2 - worth an upgrade to 7.13, there's always improvements being made.

No (not currently using ILM); that and data tiering is something I do need to work on.

I have a number of architectural changes I need to make (and am currently working through), but the focus of this particular question is around the milk/wine scenario.

I appreciate that, but getting the fundamentals right is a better place to start. Otherwise you're building a whole new way of processing data that will still eventually suffer from oversharding.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.