Hi! I am faced with a situation where I am not sure whether an enrich processor or a transform should be used.
In my situation, I have sensors sending events in batch to Elastic. Each sensor has a sensor_key
and the events it sends are timestamped. On the server side, we also uniquely identify each sensor with a sensor_id
, and so I have an index containing a sensor_key
to sensor_id
mapping.
The same sensor_id
may match several sensor_key
.
In terms of indices here is what the event index (actually, datastream) looks like:
"sensor_key": "abc",
"event_type": "something_happened"
"event_timestamp": "2023-06-19:21:02:03"
"ingest_timestamp": "2023-06-19:22:02:03"
... # other fields and values
and my "mapping" index is simply containing documents like this:
"sensor_key": "abc",
"sensor_id": "123"
As we receive many events, I want to aggregate my events documents into daily summaries. I have already a transform that continuously looks for new event documents, and aggregates based on sensor_key and timestamp
Transform sample
{
"id": "my-transform",
"source": {
"index": ["sensor-events"]
},
"dest": {
"index": "sensors-aggregated-overview"
},
"sync": {
"time": {
"field": "ingest_timestamp",
"delay": "60s"
}
},
"pivot": {
"group_by": {
"date": {
"date_histogram": {
"field": "event_timestamp",
"calendar_interval": "1d"
}
},
"sensor_key": { // not ideal
"terms": {
"field": "sensor_key"
}
}
},
"aggregations": {
"number_of_events": {
"value_count": {
"field": "event_type"
}
}
}
}
}
but the trick here is that I want to aggregate by sensor_id
, not by sensor_key
. Essentially, the desired destination aggregation document looks like:
"sensor_id": "123"
"event_type": "some_variable"
"date": "..."
... # other fields and values
Thus, I see 2 solutions to obtain the desired result:
- Use the enrich processor in an ingest pipeline for incoming events, and update the enrich index whenever I need to modify the
sensor_key
tosensor_id
mapping. - Modify my transform to match
sensor_key
to asensor_id
based on my mapping index, but I am not sure how without messing up my aggregation.
The solution 1 is fairly simple to setup, however the sensor_key
to sensor_id
mapping is changing frequently, so until this and/or this have been addressed, I'll need some external trigger to execute periodically the enrich policy.
I've seen @Hendrik_Muhs on this forum often mention solution 2 but I am not sure how to "join" two indices with different models. As far as I understand, the 2 indices to join with the transform must share a common field, but the only one they share would be the sensor_key
, which I don't want to use for aggregation. I feel like I'd have to "chain" 2 transforms to achieve what I want, one to resolve the sensor_id
mapping, the other to do the aggregation . It's also not really convenient given that transform don't support datastream outputs (which means manual lifecycle management), and I am not sure how the sync
property of the transform would work if one of the two indices does not have timestamp field.
Any pointers on how to solve that kind of problem?