Enrich vs Transform with 2 source indices

Hi! I am faced with a situation where I am not sure whether an enrich processor or a transform should be used.
In my situation, I have sensors sending events in batch to Elastic. Each sensor has a sensor_key and the events it sends are timestamped. On the server side, we also uniquely identify each sensor with a sensor_id, and so I have an index containing a sensor_key to sensor_id mapping.
The same sensor_id may match several sensor_key.

In terms of indices here is what the event index (actually, datastream) looks like:

"sensor_key": "abc",
"event_type": "something_happened"
"event_timestamp": "2023-06-19:21:02:03"
"ingest_timestamp": "2023-06-19:22:02:03"
... # other fields and values

and my "mapping" index is simply containing documents like this:

"sensor_key": "abc",
"sensor_id": "123"

As we receive many events, I want to aggregate my events documents into daily summaries. I have already a transform that continuously looks for new event documents, and aggregates based on sensor_key and timestamp

Transform sample
{
    "id": "my-transform",
    "source": {
        "index": ["sensor-events"]
    },
    "dest": {
        "index": "sensors-aggregated-overview"
    },
    "sync": {
        "time": {
            "field": "ingest_timestamp",
            "delay": "60s"
        }
    },
    "pivot": {
        "group_by": {
            "date": {
                "date_histogram": {
                    "field": "event_timestamp",
                    "calendar_interval": "1d"
                }
            },
            "sensor_key": { // not ideal
                "terms": {
                    "field": "sensor_key"
                }
            }
        },
        "aggregations": {
            "number_of_events": {
                "value_count": {
                    "field": "event_type"
                }
            }
        }
    }
}

but the trick here is that I want to aggregate by sensor_id, not by sensor_key. Essentially, the desired destination aggregation document looks like:

"sensor_id": "123"
"event_type": "some_variable"
"date": "..."
... # other fields and values

Thus, I see 2 solutions to obtain the desired result:

  1. Use the enrich processor in an ingest pipeline for incoming events, and update the enrich index whenever I need to modify the sensor_key to sensor_id mapping.
  2. Modify my transform to match sensor_key to a sensor_id based on my mapping index, but I am not sure how without messing up my aggregation.

The solution 1 is fairly simple to setup, however the sensor_key to sensor_id mapping is changing frequently, so until this and/or this have been addressed, I'll need some external trigger to execute periodically the enrich policy.

I've seen @Hendrik_Muhs on this forum often mention solution 2 but I am not sure how to "join" two indices with different models. As far as I understand, the 2 indices to join with the transform must share a common field, but the only one they share would be the sensor_key, which I don't want to use for aggregation. I feel like I'd have to "chain" 2 transforms to achieve what I want, one to resolve the sensor_id mapping, the other to do the aggregation :thinking:. It's also not really convenient given that transform don't support datastream outputs (which means manual lifecycle management), and I am not sure how the sync property of the transform would work if one of the two indices does not have timestamp field.

Any pointers on how to solve that kind of problem?

There are many aspects in this problem. Overall I think using transform to join in order to replace sensor_key with a sensor_id is possible, but not the ideal solution. I agree that enrich would be the more natural choice. How do you ingest the data into elasticsearch? Maybe it is possible to do the mapping even before it is ingested.

If you want to use transform, you have to join with 1 transform and aggregate with a 2nd one. Due to many keys collapsing to 1 id, it is also required to join first before aggregating.

If you go the transform way:

  • write transform 1 to join the data, you can get rid of sensor_key by dropping the field in an ingest pipeline that runs after
  • it is possible to use continuous transform despite the lack of a timestamp field in one source, however without the timestamp transform triggering won't work(a benefit from transform vs. enrich: enrich only runs once, while a continuous transform can update/fix the destination index in case you realize that an existing key to id entry was wrong).
  • write a transform 2 to do the date histogram aggregation, this can run on the output of the 1st transform. Ensure that delay is set correctly, e.g. if you use the original timestamp field, take into account the worst case extra time it takes until a document appears in the destination index. Alternatively write an ingest timestamp as part of transform 1

Thanks a lot @Hendrik_Muhs for your detailed answer!
In my case, data comes from an Azure Event Hub (similar to AWS Kinesis or Apache Kafka) and is ingested via Filebeat, using the dedicated integration plugin.

I think I'll settle for the enrich version, because the 2-transforms layer is more intricate to understand and setup, although it does sounds viable, especially as you say, if I was to timestamp both sources.

For now I'll trade the complexity for the risk of not having an up-to-date enrich index.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.