EQL detection rule run on indices created by Transforms

Hello all,
I was trying to create an EQL rule that detects port scanning activity. In order to do that I created a tranform that runs an cardinality aggregation on destination port from my netflow logs :

{
  "id": "portscan",
  "version": "7.11.2",
  "create_time": 1616444368293,
  "source": {
    "index": [
      "filebeat-*"
    ],
    "query": {
      "match_all": {}
    }
  },
  "dest": {
    "index": "port_scan"
  },
  "frequency": "5m",
  "sync": {
    "time": {
      "field": "@timestamp",
      "delay": "60s"
    }
  },
  "pivot": {
    "group_by": {
      "source.ip": {
        "terms": {
          "field": "source.ip"
        }
      },
      "destination.ip": {
        "terms": {
          "field": "destination.ip"
        }
      }
    },
    "aggregations": {
      "destination.port.cardinality": {
        "cardinality": {
          "field": "destination.port"
        }
      },
      "@timestamp.max": {
        "max": {
          "field": "@timestamp"
        }
      }
    }
  },
  "description": "port scan",
  "settings": {
    "max_page_search_size": 1000
  }
}

I also made a max aggregation on @timestamp that gave me a field @timestamp.max.

Error : Found 1 problem line -1:-1: Cannot use field [@timestamp] type [object] only its subfields

The problem is that I can't create an EQL rule because I can't override date field @timestamp and change to @timestamp.max like when I use EQL search API :

GET /port_scan/_eql/search
{
  "timestamp_field": "@timestamp.max",
  "size": 50, 
  "query": """
    any where destination.port.cardinality >= 5000 and (source.ip != "192.168.0.0/16" and source.ip != "10.0.0.0/8")
  """
}

Anyway how I can override this ?
Thank you

1 Like

From a transform perspective:

You can name the output field differently, @timestamp.max is just a suggestion from the UI. If it helps you can name it @timestamp or e.g. @timestamp_max to avoid nested objects.

Regarding your query: This should be possible with an ordinary elasticsearch query. Note, you can also filter out private IP ranges in the transform using a source query.

2 Likes

Hello @Hendrik_Muhs,

Thank you for your reply, while this solved my problem I switched my logic to group by date histogram of timestamp instead of a aggregate on timestamp since this gave the possibility to reset the destination port cardinality value every 1h.

Thank you

2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.