What is the most (time-)efficient way to "join" two indexes in elasticsearch?

I have the following 2 SQL tables:

Current table A:

ID Data Status
1 Some Data open
2 Other Data open

Historical Table B:

ID Data
1 Some old Data
1 Some older Data
2 Other Data
2 Other older Data
2 Other very old Data

Now i want to visualize in kibana e.g. how many historical data entries there is per ID with Status = open from Table A.
Both tables are updated regularly in the database and i want the visualization in kibana to be relatively up-to-date.
So far I'm loading the data into elasticsearch using the jdbc plugin in logstash.

What is the most efficient way to achieve this?

  1. (What I'm doing right now, which might take some time though for very big tables) Joining both tables in the SQL Query in the logstash config file and outputting the result into an elasticsearch index
  2. As @Hendrik_Muhs mentioned several times in this forum, e.g. here: Use transforms. What i didn't quite understand yet: How do i do this? How do i merge all data from table A and table B into one destination index without using any aggregation since i want the aggregations to happen in kibana and thus only keep the raw data in my index?
  3. Use the enrich setup - as far as i understand this might be feasible but is not recommended since both tables are updated regularly

That's all the options i explored so far, is there any other option to solve my usecase?
What is the most (time-)efficient/recommended way to do this?

Hi @Knut_Knackwurst,

You understood well, if both of your indices are regularly getting new data, a transform is a better solution than an enrich setup (which is recommended when at least one the two indices is static).

To join documents using transforms, you can write a latest, cotinuous transform that takes table A and historical table B as source indices, so something like :

PUT _transform/test_merge_ab
{
  "source": {
    "index":[
      "table_a",
      "historical_table_b"
    ]
  },
  "latest": {
    "unique_key": [
      "ID"
    ],
    "sort": "@timestamp"
  },
  "dest": {
    "index": "merge_ab_tables"
  },
  "sync": {
    "time": {
      "field": "@timestamp"
    }
  }
}

Also, and although this is still in tech preview, you might be interested in taking a look at ES|QL, especially the ENRICH policy.

Hope that helps,

1 Like

Hi @greco,

thanks for your reply!
The principle of your approach is pretty much exactly what I was looking for.
However, the result of the merged tables is not as expected:
The number of documents in the target index is that of table A, the available fields are those of table B.
I would like to achieve a full join so that the target index contains all fields from both source indices as well as all documents.

To stick with my example:
This is the result i get using your suggested Transform:

ID (mutual) Data (from Table B)
1 Some old Data
2 Other Data

This is what i want:

ID (mutual) Data (from Table A) Status (from Table A) Data (from Table B)
1 Some Data open Some old Data
1 Some Data open Some older Data
2 Other Data open Other Data
2 Other Data open Other older Data
2 Other Data open Other very old Data

Can you help me adjusting the transform job accordingly?

I think the main problem is that an ID can only appear once in the target index.
In my logstash config, I have so far solved this by creating a new composite key for each document, which contains the primary key of table A and the primary key of table:

input {
    jdbc {
    ...
    }
}
filter {
    mutate {
        add_field => {
            "unique_id" => "%{primary_key_of_table_a}_%{primary_key_of_table_b}"
        }
    }
}
output {
    stdout { codec => rubydebug }
    elasticsearch {
        ...
        document_id => "%{unique_id}"
    }
}

Can this be replicated using the Transform?

Hi!
I was able to hack the solution below using painless script.
It produces the results you mentioned (5 rows).
You can treat it as a base for further experimentation.

PUT a
{
  "mappings": {
    "properties": {
      "event.ingested": {
        "type": "date"
      },
      "ID": {
        "type": "long"
      },
      "Data": {
        "type": "keyword"
      },
      "Status": {
        "type": "keyword"
      }
    }
  }
}

PUT b
{
  "mappings": {
    "properties": {
      "event.ingested": {
        "type": "date"
      },
      "ID": {
        "type": "long"
      },
      "Data": {
        "type": "keyword"
      }
    }
  }
}

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "Index the ingest timestamp as 'event.ingested'",
        "field": "event.ingested",
        "value": "{{{_ingest.timestamp}}}"
      }
    }
  ]
}

POST a/_doc?pipeline=my-pipeline
{
  "ID": 1,
  "Data": "Some Data",
  "Status": "open"
}

POST a/_doc?pipeline=my-pipeline
{
  "ID": 2,
  "Data": "Other Data",
  "Status": "open"
}

POST b/_doc?pipeline=my-pipeline
{
  "ID": 1,
  "Data": "Some older Data"
}

POST b/_doc?pipeline=my-pipeline
{
  "ID": 1,
  "Data": "Some old Data"
}

POST b/_doc?pipeline=my-pipeline
{
  "ID": 2,
  "Data": "Other very old Data"
}

POST b/_doc?pipeline=my-pipeline
{
  "ID": 2,
  "Data": "Other older Data"
}

POST b/_doc?pipeline=my-pipeline
{
  "ID": 2,
  "Data": "Other Data"
}

GET _transform/_preview
{
  "source": {
    "index": [ "a", "b" ]
  },
  "dest": {
    "index": "joined"
  },
  "pivot": {
    "group_by": {
      "ID": {
        "terms": {
          "field": "ID"
        }
      }
    },
    "aggs": {
      "buckets": {
        "scripted_metric": {
          "init_script": """
            state.docsA = [];
            state.docsB = [];
          """,
          "map_script": """
            if (doc.containsKey('Status')) {
              state.docsA.add(new HashMap(params['_source']));
            } else {
              state.docsB.add(new HashMap(params['_source']));
            }
          """,
          "combine_script": """
            return state;
          """,
          "reduce_script": """
            def docsA = [];
            def docsB = [];
            for (s in states) {
              docsA.addAll(s.docsA);
              docsB.addAll(s.docsB);
            }
          
            def ret = [];
            for (a in docsA) {
              for (b in docsB) {
                def joined = new HashMap(b);
                joined['StatusA'] = a.Status;
                joined['DataA'] = a.Data;
                ret.add(joined);
              }
            }
            return ret;
          """
        }
      }
    }
  }
}
1 Like

Hi @przemekwitek !

Thanks for the solution, that helped.
Is it also possible to find a denormalised solution, where all the data is stored in fields of the document instead of using buckets?

I realise that my example was not sufficient for this usecase, so I'll try to describe it in more detail:

Both tables, A and B come from an Oracle SQL database. The primary key of table 1 is ID.
In table B there is at least one entry for each ID from table A, but in most cases there are several entries. The primary key for table B is KEY_B.

I would now like to use the transform to create a destination index that basically contains all the documents with all their fields from index B and the corresponding fields from the document in index A that has the same ID as in table A for each entry.

I adapt my example tables accordingly a:

Table A

ID Data Status
1 Some Data open
2 Other Data open

Table B

KEY_B ID Data
a 1 Some old Data
b 1 Some older Data
c 2 Other Data
d 2 Other older Data
e 2 Other very old Data

Desired destination index:

Destination_key ID (Primary Key from Table A) Data (from Table A Status (from Table A) Data (from Table B)
a_1 1 Some Data open Some old Data
b_1 1 Some Data open Some older Data
c_2 2 Other Data open Other Data
d_2 2 Other Data open Other older Data
e_2 2 Other Data open Other very old Data

I have used a composite field from the matched primary keys of the respective entries from tables A and B as the destination_key.

Unfortunately, I have so far failed to transfer your solution to my desired result, can you help me again?

Hi @przemekwitek again!

As i noticed, the bucket structure works well for my needs. I hadn't originally understood its functionality, so I thought the denormalised structure with one document per line, which would result in an SQL join, would be necessary.

Thanks again!

1 Like

No problem, I'm glad I could help!