Hi!
I was able to hack the solution below using painless script.
It produces the results you mentioned (5 rows).
You can treat it as a base for further experimentation.
PUT a
{
"mappings": {
"properties": {
"event.ingested": {
"type": "date"
},
"ID": {
"type": "long"
},
"Data": {
"type": "keyword"
},
"Status": {
"type": "keyword"
}
}
}
}
PUT b
{
"mappings": {
"properties": {
"event.ingested": {
"type": "date"
},
"ID": {
"type": "long"
},
"Data": {
"type": "keyword"
}
}
}
}
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "Index the ingest timestamp as 'event.ingested'",
"field": "event.ingested",
"value": "{{{_ingest.timestamp}}}"
}
}
]
}
POST a/_doc?pipeline=my-pipeline
{
"ID": 1,
"Data": "Some Data",
"Status": "open"
}
POST a/_doc?pipeline=my-pipeline
{
"ID": 2,
"Data": "Other Data",
"Status": "open"
}
POST b/_doc?pipeline=my-pipeline
{
"ID": 1,
"Data": "Some older Data"
}
POST b/_doc?pipeline=my-pipeline
{
"ID": 1,
"Data": "Some old Data"
}
POST b/_doc?pipeline=my-pipeline
{
"ID": 2,
"Data": "Other very old Data"
}
POST b/_doc?pipeline=my-pipeline
{
"ID": 2,
"Data": "Other older Data"
}
POST b/_doc?pipeline=my-pipeline
{
"ID": 2,
"Data": "Other Data"
}
GET _transform/_preview
{
"source": {
"index": [ "a", "b" ]
},
"dest": {
"index": "joined"
},
"pivot": {
"group_by": {
"ID": {
"terms": {
"field": "ID"
}
}
},
"aggs": {
"buckets": {
"scripted_metric": {
"init_script": """
state.docsA = [];
state.docsB = [];
""",
"map_script": """
if (doc.containsKey('Status')) {
state.docsA.add(new HashMap(params['_source']));
} else {
state.docsB.add(new HashMap(params['_source']));
}
""",
"combine_script": """
return state;
""",
"reduce_script": """
def docsA = [];
def docsB = [];
for (s in states) {
docsA.addAll(s.docsA);
docsB.addAll(s.docsB);
}
def ret = [];
for (a in docsA) {
for (b in docsB) {
def joined = new HashMap(b);
joined['StatusA'] = a.Status;
joined['DataA'] = a.Data;
ret.add(joined);
}
}
return ret;
"""
}
}
}
}
}