To me aggregate filter is the most natural solution to this problem. You basically want to implement some (time-window based) join
operation on the ID. Problem with the ID is, you need some parsing as well, in order to extract the ID.
On the one hand, do you really need to correlate/join the data before indexing? Sometimes it's good enough to just parse the contents in order to build dashboards. One can still use the ID for filtering in ES/kibana.
For implementing a distributed join you need a stable partitioning based on the ID. Using the Elastic Stack, this might require two layers of Logstash. The first layer parsing the messages (also extracting the ID) and the second layer doing the correlation (using the aggregate filter). The first layer will need to choose the output based on a hash on the ID (unfortunately no LS output supports some stable partitioning -> requires quite some config in LS).
output {
if [@metadata][hash] == 0 {
// LS 0
}
if [@metadata][hash] == 1 {
// LS 1
}
...
}