Update an event fields based on another event

Hi, i have some logs indexed in elasticsearch by logstash that provides two types of events:

    {
        "@timestamp": "Nov 23, 2023 @ 15:24:33.064",
        "Detection ID": "ldt:87654321",
        "logSource": "CS Detection",
        "src": "0.0.0.0",
        "usrName": "user.of.detection.log",
        "New State": "new"
    }
    {
        "@timestamp": "Nov 23, 2023 @ 15:28:55.953",
        "Detection ID": "ldt:87654321",
        "New State": "closed",
        "Operation Name": "detection_update",
        "logSource": "CS Audit",
        "src": "0.0.0.0",
        "usrName": "user.from.audit.log"
    }

The first log (CS Detection) would be a main event and the second log (CS Audit) would be an update of the main event, although the data and information are not the same.

Once the second log is indexed, I would like it to be checked whether the Detection ID field has the same value in both logs, in addition to checking in the second log whether the "Operation Name" field has the value "detection_update".

If these conditions were met, I would like the value of the "New State" field to be updated in the Detection log with the value of the "New State" field of the Audit log, in addition to returning the value of the "usrName" field of the Audit log and adding it to the field "Assigned To" from the Detection log.

Expected outcome:

    {
        "@timestamp": "Nov 23, 2023 @ 15:24:33.064",
        "Assigned To": "user.from.audit.log",
        "Detection ID": "ldt:87654321",
        "logSource": "CS Detection",
        "src": "0.0.0.0",
        "usrName": "user.of.detection.log",
        "New State": "closed"
    }

It's possible?

It is possible, but it maybe not be so simple depending on the end goal.

Logstash is event based, every event is independent from each other, so it works best this way.

A quick question, why you can not have the two events in Elastiscearch? Why the need to update the event instead of using the data from the last event?

For example, is there any issue in having those two events?

    {
        "@timestamp": "Nov 23, 2023 @ 15:24:33.064",
        "Detection ID": "ldt:87654321",
        "logSource": "CS Detection",
        "src": "0.0.0.0",
        "usrName": "user.of.detection.log",
        "New State": "new"
    }

    {
        "@timestamp": "Nov 23, 2023 @ 15:28:55.953",
        "Assigned To": "user.from.audit.log",
        "Detection ID": "ldt:87654321",
        "New State": "closed",
        "Operation Name": "detection_update",
        "logSource": "CS Audit",
        "src": "0.0.0.0",
        "usrName": "user.from.audit.log"
    }

Today we are already indexing these two types of events and both are important for our analysis, as they are EDR events.

We have a dashboard where we monitor detections throughout the day, but when the detection event is indexed, within Kibana we are not sure whether it was handled by someone or the status of the detection, and this information ends up being brought in Audit events, which is already part of another LogSource.

The objective I would like to have would be to have a metric of detections whose status is "new" (meaning that there has been no treatment yet), in addition to updating the detections with who is/was handling the detection and its status.

This can be done, but I don't think Logstash is the better tool to do that.

You would basically need to do the following steps:

  • use a custom id for your detection index, this can be done using the fingerprint filter to create a hash based on the value of some field, like the Detection ID field.
  • create the same fingerprint in the audit configuration pipeline and use a elasticsearch filter to query the detection index.
  • if you got a match, then you can edit the document, adding the new state and assigned user, and index it in the detection index, replacing the original document with the updated one.
  • if you need to also store the original audit event, you will need to duplicate this event and use another pipeline, one to store the original audit event and other to query the detection index and update it.

How you will do that depends entirely on how your configuration pipeline looks like, for example, do you have one pipeline for the detection data an other pipeline for the audit data? What do they look like?

If you share your pipelines I may be able to provide more feedback.

Also, keep in mind that if the detection event and the update audit event happens at the same time or pretty close, this may not work because the elasticsearch query may not return any hit yet.

I have a couple of similar use cases, but I use python to query a couple of indices and update them accordingly.

Well, not that it necessarily has to be done within Logstash, but at least 95% of the internal use cases here I managed to solve through it, so it would kind of be a starting point but not the only one.

The pipeline at the moment is very simple:

input {
  tcp {
    port => 5142
    codec => "json"
  }
}

filter {
  json {
    source => "message"
    target => "parsedjson"
  }

  mutate {
    remove_field => ["type", "version", "name", "@version", "isoTimeFormat"]
  }
}

output {
  elasticsearch {
    hosts => "${ES_IP}"
    ssl_certificate_authorities => "${ES_SSL}"
    ssl_verification_mode => "full"
    index => "cs-%{+dd.MM.YYYY}"
    user => "${ES_USER}"
    password => "${ES_PASSWORD}"
  }
}

Both events are indexed on the same index, where in the dashboards I separate these events by logSource. As there is no automation involved in EDR, we do not have the risk of Detection and Audit logs coming at the same time

I forgot these plugin:

if [logSource] == "CS Detection" {
    mutate {
        rename => { "Detect ID" => "Detection ID" }
        add_field => { "New State" => "new" }
    }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.