What is the most (time-)efficient way to "join" two indexes in elasticsearch?

Hi!
I was able to hack the solution below using painless script.
It produces the results you mentioned (5 rows).
You can treat it as a base for further experimentation.

PUT a
{
  "mappings": {
    "properties": {
      "event.ingested": {
        "type": "date"
      },
      "ID": {
        "type": "long"
      },
      "Data": {
        "type": "keyword"
      },
      "Status": {
        "type": "keyword"
      }
    }
  }
}

PUT b
{
  "mappings": {
    "properties": {
      "event.ingested": {
        "type": "date"
      },
      "ID": {
        "type": "long"
      },
      "Data": {
        "type": "keyword"
      }
    }
  }
}

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "Index the ingest timestamp as 'event.ingested'",
        "field": "event.ingested",
        "value": "{{{_ingest.timestamp}}}"
      }
    }
  ]
}

POST a/_doc?pipeline=my-pipeline
{
  "ID": 1,
  "Data": "Some Data",
  "Status": "open"
}

POST a/_doc?pipeline=my-pipeline
{
  "ID": 2,
  "Data": "Other Data",
  "Status": "open"
}

POST b/_doc?pipeline=my-pipeline
{
  "ID": 1,
  "Data": "Some older Data"
}

POST b/_doc?pipeline=my-pipeline
{
  "ID": 1,
  "Data": "Some old Data"
}

POST b/_doc?pipeline=my-pipeline
{
  "ID": 2,
  "Data": "Other very old Data"
}

POST b/_doc?pipeline=my-pipeline
{
  "ID": 2,
  "Data": "Other older Data"
}

POST b/_doc?pipeline=my-pipeline
{
  "ID": 2,
  "Data": "Other Data"
}

GET _transform/_preview
{
  "source": {
    "index": [ "a", "b" ]
  },
  "dest": {
    "index": "joined"
  },
  "pivot": {
    "group_by": {
      "ID": {
        "terms": {
          "field": "ID"
        }
      }
    },
    "aggs": {
      "buckets": {
        "scripted_metric": {
          "init_script": """
            state.docsA = [];
            state.docsB = [];
          """,
          "map_script": """
            if (doc.containsKey('Status')) {
              state.docsA.add(new HashMap(params['_source']));
            } else {
              state.docsB.add(new HashMap(params['_source']));
            }
          """,
          "combine_script": """
            return state;
          """,
          "reduce_script": """
            def docsA = [];
            def docsB = [];
            for (s in states) {
              docsA.addAll(s.docsA);
              docsB.addAll(s.docsB);
            }
          
            def ret = [];
            for (a in docsA) {
              for (b in docsB) {
                def joined = new HashMap(b);
                joined['StatusA'] = a.Status;
                joined['DataA'] = a.Data;
                ret.add(joined);
              }
            }
            return ret;
          """
        }
      }
    }
  }
}
1 Like