SIEM Event correlation with Elasticsearch and Logstash




I am using Wazuh with ELK as my SIEM tool. I am able to generate alerts on Kibana from my logs on the basis of ossec ruleset. I need to know find correlation between these events and need to know if this would be possible using querying in Elasticsearch and Losgtash. For example a use case could be-
A user is deleted within 48 hours of its creation from the Windows Active Directory
I get the creation and deletion logs and alerts of the same on Kibana
I need to correlate these events based on timestamp and generate another alert(if on Kibana would be great). Any help would be appreciated!

Thank you

(Mark Harwood) #2

Behavioural analysis is hard at scale on an event-centric index. The physical distribution of the event data (typically randomised across shards in time-based indices) makes it hard to assemble related events for each of the entities you want to examine.
You might want to look at entity centric indexes for spotting some of the behavioural outliers.


Hi Mark,
I looked into your recommendation of entity centric indexes for behavioural analysis but for this particular use case I need to look only when a user gets deleted and check if its creation was in the last 48 hours, would I still need to have entity centric index or would event centric index be good?
Also I wanted to know the approach for how to implement more such event correlations using querying in elasticsearch as more such use cases are in the pipeline.

Thank you

(Mark Harwood) #4

To perform behavioural analytics at query-time on an event-centric store you need a "yes" to both of these questions:

  1. Can I squeeze all the events of potential interest (grouped by entity) into the aggregation responses from distributed shards without exhausting memory limits?
  2. Can I use an aggregation or a script to filter the combined shard results down to those that match the behaviour I'm looking for?

In your case 1) is difficult because you can't just query for all users with an "accountDeleted" event because you also need the corresponding accountCreated documents too for those users. You'd have to first query for all accounts with a deleted event then use all those IDs in the results to do a second query for "accountCreated OR accountDeleted" and group results by the user id.
There are limits on how many IDs you can supply in a query or how many buckets you can get back in results.

Answering question 2) maybe tricky - we can use pipeline aggregations to do a level of result trimming but there are limits on what data structures can be accessed and what logic can be performed.

In contrast, an entity-centric index can have a simple update script to calculate items of interest e.g. (pseudo code...)

if (newEvent.action == "deleted") {
    accountLifespan = accountDeletedDate - accountCreatedDate;

That is simple logic to write and scales with data volumes. Being a concrete field you can also plot Kibana bar charts on properties like accountLifespan or run queries.

What is hard to derive at query time can be easy at index time.

(system) closed #5

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.