So far I've seen a built-in job called rare source ip for user, however, it is using auditbeat. Is there a way to detect rare source ips for a user using a different datasource?
ML will find unusual locations (usaually in NYC, but now in London), but it does not take into account natural travel, thus giving "false positives" in those cases of natural travel. As a result, you need to apply a different approach to calculate the speed required between geo-locations.
If the same credentials are used within a short period of time, it is likely impossible for the actual owner of the credentials to travel from Location A to Location B in that time. A violation of a speed (say greater than 1000km/hr) would likely indicate that the user’s credentials were stolen/compromised/shared. The speed is calculated by using the geo-location of Location A and Location B, then applying the Haversine formula to calculate the distance between two points on the Globe.
Approach:
Easiest to implement the calculation as a Transform. This will allow the raw data to be pivoted on the username and for the distance to be calculated using the latest two geo-locations of login events for each user. The answer can be expressed as a velocity (distance over time) because the time delta between login events is also known/calculated by the Transform. The Transform will use a scripted_metric and the Painless scripting language to implement the Haversine formula and return the speed and the distance.
Implementation:
Haversine distance formula written in painless (can be pasted into the Painless Lab in DevTools)
def first_time=1621382400000L;
def second_time=1621386000000L;
def duration=(second_time - first_time)/1000;
def lat2 = Math.toRadians(42.36114);
def lon2 = Math.toRadians(-71.057083);
def lat1 = Math.toRadians(35.8762);
def lon1 = Math.toRadians(-84.1746);
// Haversine formula
def dlon = lon2 - lon1;
def dlat = lat2 - lat1;
def a = Math.pow(Math.sin(dlat/2), 2) + Math.cos(lat1) * Math.cos(lat2) * Math.pow(Math.sin(dlon/2),2);
def c = 2 * Math.asin(Math.sqrt(a));
// define radius of earth. For km use 6371, for mi use 3956
def r = 6371;
// calculate the distance
def distance=(c * r);
// calculate speed
def speed=Math.round(3600*(distance/duration));
return speed;
An example using Transforms is seen here
Result:
Screenshot of investigation timeline showing the distance detected was 1300+km and the time between logins was 30 mins, so the speed detected was 2679 km/h
Knoxville, TN and Boston, MA are indeed about 1300km apart.
This is awesome - thank you for sharing this! I have a few questions I was hoping you could provide some input on.
The sample transform you provided in the gdrive link actually works perfectly in the dev tools console for my use case with a little filter tweaking (Office 365 sign in monitoring). This is my first time working with Transforms and I'm attempting to recreate your setup. It looks like the transform populates a new index with the results based on some sort of timeframe (in my testing it searched every week in the current month).
- How are you triggering a rule? Are you running an Rule to trigger anytime your "impossible_travel" index contains an event or when it contains an event over 1300km?
- Would it be possible to combine this with a ML rule to filter out some false positives? Say you have a user that travels, leaves their iPad at home which generates a sign in and then they sign in on mobile in a different location at the same time? Perhaps a ML rule would know that iPad at home shows up often and isn't a concern?
- Can you elaborate on the times in the output? Like I mentioned before it looks like something is happening weekly, but it doesn't seem like the timestamps in the results align with the timestamps in an event.
- To implement this fully, do I just need to create an index and change the POST _transform/_preview to something like PUT _transform/impossible_travel as well as add a destination index and sync interval to the example transform JSON?
It doesn't look like I can create this from Kibana since cardinality of source.geo.location isn't supported, so I'm just trying to wrap my head around the transform documentation and work off your example - any help is greatly appreciated!
- How are you triggering a rule? Are you running an Rule to trigger anytime your "impossible_travel" index contains an event or when it contains an event over 1300km?
No, not the distance, rather the speed. (session_details.speed_kmh > 1000)
- Would it be possible to combine this with a ML rule to filter out some false positives? Say you have a user that travels, leaves their iPad at home which generates a sign in and then they sign in on mobile in a different location at the same time? Perhaps a ML rule would know that iPad at home shows up often and isn't a concern?
I wouldn't bother with that for now. I would think the level of false positives should be quite low using this approach
- Can you elaborate on the times in the output? Like I mentioned before it looks like something is happening weekly, but it doesn't seem like the timestamps in the results align with the timestamps in an event.
The transform can run continuously, but the part you're thinking about:
"@timestamp": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1w"
}
}
},
is meant to break out these calculations (i.e. calculate for every user the time/distance/speed between the last two login events) every week. This value can be changed (perhaps increased) but I arbitrarily chose a week so that you're not trying to make these calculations for every user over "all-time". The downside to choosing a week is that if the two logins for a user happen to span a week boundary, then they would not be considered in the calculation. Maybe even experiment with taking this out altogether (??) and see if it still performs reasonably. It might!
- To implement this fully, do I just need to create an index and change the POST _transform/_preview to something like PUT _transform/impossible_travel as well as add a destination index and sync interval to the example transform JSON?
correct - you need to PUT
the transform once you're done playing with it using the _preview
endpoint. You don't need to manually create the output index, the transform will create it for you. And yes, if you want it to run continuously, you will need to define a sync
section
This is so helpful, thank you again! A few more questions:
- With a continuous transform are events recreated continuously or once they're written it doesn't write them again?
- If I take out the time constraint, is the transform always looking at the "all-time" results or is it rolling in that once it's evaluated a past event, it doesn't evaluate it again? I'm just trying to understand the performance impact, like you mentioned. As of today I think my signins are ok, so do I need to do something different to only evaluate signins from today forward?
Lastly, as far as the rule to trigger being speed, do you see any issue with evaluating speed and distance? In my testing I have a local sign in and a sign in in my datacenter. Both signins are geographically close (4.6km) but since I use both sessions at the same time, my speed was super fast (1850km). I'm wondering if something like IF speed is > 1000km AND distance > 30km THEN trigger alert could address this? I'm going to play with all 3 variants as rules and see what kind of results I get, but was curious what you thought. I can see this as an issue for anyone that uses a hosted VDI - getting email on your phone and then logging into a virtual desktop in a different geo location will generate lots of alerts.
See the transform docs How transform checkpoints work | Elasticsearch Guide [7.15] | Elastic
it tells you the logic it uses to evaluate/update the output given changes to the input (source index).
If you do take out the time constraint, it did indeed look over "all-time", however, the script in the transform:
all_docs.sort((HashMap o1, HashMap o2)->o1['@timestamp'].millis.compareTo(o2['@timestamp'].millis));
def size = all_docs.size();
def min_time = all_docs[size-2]['@timestamp'];
def max_time = all_docs[size-1]['@timestamp'];
def duration=(max_time.millis - min_time.millis)/1000;
always takes the latest 2 login event timestamps (since it sorts them) - regardless of how many are in the overall time interval. In other words - as time goes on, and more login events are added per user, I wonder if those HashMaps will keep growing and growing in size (they probably will but I'm not as detail-oriented of a Java programmer as say @BenTrent is). This might cause degradation in performance over time.
Now thinking about this, having a 1-week or something boundary to constrain this might actually be a good idea. The scripting code might also need better error checking for edge cases like there being only one login for a particular user and thus calculating the delta might throw an error. I only created this transform as a proof-of-concept and it is not hardened, production-worthy code I'm sure.
regardless of how many are in the overall time interval. In other words - as time goes on, and more login events are added per user, I wonder if those HashMaps will keep growing and growing in size
They definitely would and it would be critical to time limit the search for this reason. In general, anything which gathers stats for all documents in an aggregation is dangerous and could OOM the nodes it runs on.
I would propose a small tweak to your excellent answer @richcollier which hard limits the aggregation memory consumption for exactly that reason.
The idea is you maintain only the bounding box of the points so need to maintain only the corner points and so the memory usage is always fixed. The key observation is this allows you to get a lower bound on the maximum speed since the shortest possible path the points could have taken in their bounding box is a straight line between its corners. The speed calculation then carries over unaffected using the bounding box corners as the two points and the duration of the time window over which points are gathered. Inside the map script you would do something like:
UpdateBB
if (state.first_data_point) {
state.min_time = state.max_time = doc['@timestamp'].value;
state.min_lat = state.max_lat = docs['source.geo.location'].value.lat;
state.min_lon = state.max_lon = docs['source.geo.location'].value.lon;
state.first_data_point = False;
} else {
state.min_time = Math.min(state.min_time, doc['@timestamp'].value);
state.max_time = Math.max(state.max_time, doc['@timestamp'].value);
state.min_lat = Math.min(state.min_lat, docs['source.geo.location'].value.lat);
state.max_lat = Math.max(state.max_lat, docs['source.geo.location'].value.lat);
state.min_lon = Math.min(state.min_lon, docs['source.geo.location'].value.lon);
state.max_lon = Math.max(state.max_lon, docs['source.geo.location'].value.lon);
}
When merging bounding boxes from different state objects in the reduce script you just use exactly the same logic: taking the min of all min values and the max of all max values.
The distance between the two corners would be calculated using the same approach as before. With this approach you do really want to constrain the time interval you search so you're checking for the case that two logins happen close enough in time that they generate a high implied speed. But the best value is data dependent so you would have to experiment with what makes sense in your environment.
This still suffers the problem Rich raised that it could be two far apart points fall in separate time buckets, but we can fix that by maintaining two bounding boxes for partially overlapping time buckets.
Let's say you choose a time bucket duration of 6h as appropriate you would do something like the following in the map script:
def now = System.currentTimeMillis() / 1000;
if (now - doc['@timestamp'].value.toEpochSecond() >= 9 * 3600 &&
now - doc['@timestamp'].value.toEpochSecond() < 3 * 3600) {
UpdateBB 1
}
if (now - doc['@timestamp'].value.toEpochSecond() >= 6 * 3600) {
UpdateBB 2
}
Here, UpdateBB 1 and 2 refers to the Painless snippet above and you would maintain two bounding boxes one for 9 to 3 hrs ago and one for the last 6hrs. It is easy to verify that if two far apart points arrive within 3hrs of one another they will both fall within at least one of the two bounding boxes. From the reduce script I would return the max implied speed from either bounding box, since this is still a lower bound for the actual speed. You would need to schedule this to run every 6hrs (or whatever you time bucket duration is).
With this approach it really only makes sense to search the last 9hrs of data (or whatever is suitable for the bucketing interval you choose), since all other documents are discarded anyway. I hope this is sufficiently clear to be able to modify this example. If you can't get something like this working then let us know.
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.