Near Realtime Threat Intel enrichment using custom external sources stored on enrichment indexes

Hi There.

First Ill try to explain my general idea (that I already have implemented and is working) then the problems I have and a few questions.

The general idea is to have a Logstash ingest pipeline, lets call it Pipeline A (with any kind of source that includes an IP).

This Pipeline queries a Elasticsearch Threat intel Index for information on the IP, then:

A – If there is information, then Logstash outputs the event to an Elasticsearch Ingest Pipeline that enriches the data and store it.

B – If there is no information:

B.1 - the pipeline generates a new event and outputs the new event to another Logstash PipeLine, Lets call it Pipeline B.

B.2 – the pipeline outputs the original event to an Elasticsearch Ingest Pipeline, that on this case will have no information to enrich the data, then is stored.

Pipeline B receives the generated event with the IP (pipeline to pipeline communication). Then it queries external services for information on the IP. And then the recovered information from external services its posted on the Threat intel Index.

Some of my problems and questions are:

  • Enrichment indexes require to be “Updated” to acknowledge new data posted on the Threat Intel Index (POST /_enrich/policy//_execute).

    • Is there a way to do this “Execute” command from Logstash?
    • How Expensive is the “Execute” operation? (considering it could get triggered multiple times x minute depending on the workloads)
    • To reduce the cost of the “Execute”, is there a way to determinate the difference between the enrichment index and the Threat intel Index (from Logstash.. The idea is to call the “execute” only if difference bigger than 100 events)?
  • Current model create events with no threat intel (only when the IP is seen for a first time). So I’m considering to send non enrich data (Step B.2) to a temporal index, then execute a re-index to an ingest pipeline that stores enriched data into final index (Same as B.1).

    • Re-index does not delete the existing data. How can I execute maintenance on this index?
    • Can I call a Re-index from Logstash?

I know that I could avoid a lot of problems if instead of doing data enrichment on Elasticsearch I just do it on Logstash. But I think that having this kind of setup simplifies the solution a lot when working with multiple kinds of incoming data sources (avoids using a collector pattern).


What do you think of the logical construct of the model? Do you see any issues or have any comments on how to Improve it?

After further testing I have found another issue that I have no idea how to avoid.

Basically, when multiple events from the same IP arrive really fast (for the first time) all events get forwarded to pipeline B, and this generates multiple queries to External service (consuming API Requests, and adding additional unnecessary lag to the process) and also creates duplicated events into the Threat intel Index.

I know that I could avoid duplicates on the Threat intel Index by using the IP filed as document ID. But the extra http requests seems unavoidable.

The only thing that comes to my mind is to avoid using the generated event for external search. And replace that pipeline with a recurrent search (deduplicated query) on the temporal index for the IPs that have no Threat Intel.

Any ideas?