Impossible Travel Detection

Hi all,

Just looking for a bit of advice regarding creating a transform job for detecting Impossible Travel Activity.

We have had some success already however, it appears the Painless script is often using the wrong source.geo.location values to calculate the distance.

It is currently looking at all data coming from o365.audit and azure.signinlogs and within an ES|QL rule we are filtering for successful sign-ins.

If anyone has any ideas, or successful implementations of this already, I would really appreciate your assistance!

Transform Job:

PUT /_transform/impossible_travel
{
  "id": "impossible_travel",
  "source": {
    "index": ["*:logs-azure.signinlogs-*", "*:logs-o365.audit*"],
    "query": {
      "range": {
        "@timestamp": {
          "gte": "now-24h/m"
        }
      }
    }
  },
  "dest": {
    "index": "impossible_travel_logs"
  },
  "frequency": "5m",
  "pivot": {
    "group_by": {
      "user.name": {
        "terms": {
          "field": "user.name"
        }
      }
    },
    "aggregations": {
      "total_distinct_locations_over_time": {
        "cardinality": {
          "field": "source.geo.location"
        }
      },
      "@timestamp": {
        "max": {
          "field": "@timestamp"
        }
      },
      "session_details": {
        "scripted_metric": {
          "init_script": "state.docs = []; state.ips = []; state.actions = []; state.outcomes = []; state.organisations = []; state.namespaces = [];",
          "map_script": """
            Map span;
            if (doc['source.geo.location'].size() != 0 && doc['client.ip'].size() != 0 && doc['source.as.organization.name'].size() != 0) {
              span = [
                '@timestamp': doc['@timestamp'].value,
                'source.geo.location': doc['source.geo.location'].value,
                'client.ip': doc['client.ip'].value,
                'event.action': doc['event.action'].value,
                'event.outcome': doc['event.outcome'].value,
                'source.as.organization.name': doc['source.as.organization.name'].value,
                'data_stream.namespace': doc['data_stream.namespace'].value
              ];
            } else {
              span = [
                '@timestamp': null,
                'source.geo.location': null,
                'client.ip': null,
                'event.action': null,
                'event.outcome': null,
                'source.as.organization.name': null,
                'data_stream.namespace': null
              ];
            }
            state.docs.add(span);
            if (doc['client.ip'].size() != 0) {
              state.ips.add(doc['client.ip'].value);
            } else {
              state.ips.add(null);
            }
            if (doc['source.as.organization.name'].size() != 0) {
              state.organisations.add(doc['source.as.organization.name'].value);
            } else {
              state.organisations.add(null);
            }
            state.actions.add(doc['event.action'].value);
            state.outcomes.add(doc['event.outcome'].value);
            state.namespaces.add(doc['data_stream.namespace'].value);
          """,
          "combine_script": "return ['docs': state.docs, 'ips': state.ips, 'actions': state.actions, 'outcomes': state.outcomes, 'organisations': state.organisations, 'namespaces': state.namespaces];",
          "reduce_script": """
          def all_docs = [];
          def all_ips = [];
          def all_actions = [];
          def all_outcomes = [];
          def all_organisations = [];
          def all_namespaces = [];
          
          for (s in states) {
              all_docs.addAll(s['docs']);
              all_ips.addAll(s['ips']);
              all_actions.addAll(s['actions']);
              all_outcomes.addAll(s['outcomes']);
              all_organisations.addAll(s['organisations']);
              all_namespaces.addAll(s['namespaces']);
          }
          all_docs.sort((o1, o2) -> {
              def ts1 = o1['@timestamp'];
              def ts2 = o2['@timestamp'];
              if (ts1 != null && ts2 != null) {
                  return ts1.toInstant().compareTo(ts2.toInstant());
              } else if (ts1 != null) {
                  return 1;
              } else if (ts2 != null) {
                  return -1;
              } else {
                  return 0;
              }
          });
          def size = all_docs.size();
          def min_time = size >= 2 ? all_docs[size - 2]['@timestamp'] : null;
          def max_time = size >= 1 ? all_docs[size - 1]['@timestamp'] : null;
          def duration = 0;
          if (min_time != null && max_time != null) {
              duration = (max_time.toInstant().toEpochMilli() - min_time.toInstant().toEpochMilli()) / 1000;
          }
          // Calculate distance only if locations are not null
          def distance = 0;
          if (size >= 2) {
              def first_src = all_docs[size - 2]['source.geo.location'];
              def second_src = all_docs[size - 1]['source.geo.location'];
              if (first_src != null && second_src != null) {
                  def lat2 = Math.toRadians(second_src.lat);
                  def lon2 = Math.toRadians(second_src.lon);
                  def lat1 = Math.toRadians(first_src.lat);
                  def lon1 = Math.toRadians(first_src.lon);
                  def dlon = lon2 - lon1;
                  def dlat = lat2 - lat1;
                  def a = Math.pow(Math.sin(dlat / 2), 2) + Math.cos(lat1) * Math.cos(lat2) * Math.pow(Math.sin(dlon / 2), 2);
                  def c = 2 * Math.asin(Math.sqrt(a));
                  def r = 6371;
                  distance = c * r;
              }
          }
          def ret = new HashMap();
          if (distance > 0 && duration > 0) {
              def speed = Math.round(3600 * (distance / duration));
              ret['speed_kmh'] = speed;
          } else {
              def speed = 0;
              ret['speed_kmh'] = speed;
          }
          ret['first_login_ip'] = size >= 2 ? all_ips[size - 2] : null;
          ret['second_login_ip'] = size >= 1 ? all_ips[size - 1] : null;
          ret['first_login_action'] = size >= 2 ? all_actions[size - 2] : null;
          ret['second_login_action'] = size >= 1 ? all_actions[size - 1] : null;
          ret['first_login_outcome'] = size >= 2 ? all_outcomes[size - 2] : null;
          ret['second_login_outcome'] = size >= 1 ? all_outcomes[size - 1] : null;
          ret['one_before_last_login'] = min_time;
          ret['last_login'] = max_time;
          ret['duration_in_seconds_between_last_logins'] = duration;
          ret['total_logins_over_time'] = size;
          ret['distanceInKM_between_last_logins'] = distance;
          ret['first_source_organisation'] = size >= 2 ? all_organisations[size - 2] : null;
          ret['second_source_organisation'] = size >= 1 ? all_organisations[size - 1] : null;
          ret['first_data_stream.namespace'] = size >= 2 ? all_namespaces[size - 2] : null;
          ret['second_data_stream.namespace'] = size >= 1 ? all_namespaces[size - 1] : null;
          return ret;
      """
        }
      }
    }
  },
  "sync": {
    "time": {
      "field": "@timestamp",
      "delay": "60s"
    }
  }
}

ES|QL Rule

from impossible_travel_logs [metadata _id, _version, _index]
  | where (session_details.first_login_action == "Sign-in activity") or (session_details.first_login_action == "UserLoggedIn")
  | where (session_details.second_login_action == "Sign-in activity") or (session_details.second_login_action == "UserLoggedIn")
  | where (session_details.first_login_outcome == "success")
  | where (session_details.second_login_outcome == "success")
  | where (session_details.first_data_stream.namespace == session_details.second_data_stream.namespace)
  | where (session_details.first_login_ip != session_details.second_login_ip)
  | where (session_details.distanceInKM_between_last_logins >= 550)
  | where (session_details.speed_kmh >= 2000)
1 Like