Implement Dynamic Hash Table?

Hi Logstash Gurus,

I think I need to use the Memcached Plugin for a design problem… but I’m not sure.

Here’s my issue: I have a Docker container Logstash instance (v7.4.0, yes I need to upgrade) currently processing several millions of data records every second. Let’s say the data records look like this:

“Alice”, …more data…
“Beth”, …more data…
“Alice”, …more data…
“Alice”, …more data…

…and so on. The first field is the “Person” field, essentially identifying the source of the data record. Right now, everything it working great.

But I am about to implement some changes in my filter{} config that would mean a lot more processing time per data record, involving calls to DNS servers, SQL servers, HTTP servers, perhaps more. With all those network calls, I’m worried that the processing time per data record could be considerable indeed. Besides, I don’t necessarily need to do all this processing with every record. I need to find a way for Logstash to process some records, but skip others. If a person sends more data within, say, 5 seconds of their last seen data record, Logstash can skip all that extra processing.

If this was a C program, I’d want to implement a dynamic hash table, where hash entries will time out and be removed after a TTL of 5 seconds.

Imagining that Logstash could do such a hash table, I’d like LS to do the following every time a new data record arrives:

  • Check to see if the sending person is in the hash table:
  • If yes, simply update that person’s TTL in the hash table
  • If no, run all those network lookups, modify the data record accordingly, then insert that person into the hash table
  • And of course, the hash table should automatically remove all entries with an expired TTL.

I’ve been combing through the Logstash documentation, and I think the tool I need is the Memcashed plugin...? Not sure. But if so, I imagine it would look something like this in the config file:

filter {
  memcached {
    set => {
      "Person" => "memcached-key-1"
      ttl => 5
    If ( new record ) {
      # Run all those network lookups, modify data record accordingly

Obviously, this is badly sketched out. But its as far as I got.

Any thoughts or advice? I am appreciative for any and all help you can offer.

Firstly, both dns and jdbc_streaming filters can implement time-based caching, so the cost of these may not be as bad as you think.

Secondly, a memcached filter require a memcached instance running somewhere external to logstash. You could do it, but that means you are adding a network call for each event. Furthermore, there are numerous race conditions that mean multiple threads may 'set' the memcached entry for the same person at about the same time.

I would at least try implementing this using aggregate using push_map_as_event_on_timeout. Make sure you disable java_execution in addition to setting pipeline.workers to 1. Yes, that will limit throughput.

Configure the aggregate filter with [Person] as the task_id. Set the timeout_tags to [ "deleteMe" ] and add

if "deleteMe" in [tags] { drop {} }

to drop the events created as entries are purged from the cache of recently seen names.

Then in the code option do something like this (I have not tested it)

code => '
    if map["justSawThisPerson"]
           event.set("[@metadata][justSawThisPerson]", true)
    map["justSawThisPerson"] = true

then finally make applying all the expensive filters conditional upon

if ! [@metadata][justSawThisPerson] {

Thanks Badger, This is specific and detailed. I love it. I now know where to focus my research. You're the best!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.