Continuously merging documents from multiple indexes

Hi all,
I'm fairly new to elasticsearch and have a question regarding merging of documents and removing duplicates.
I got the following data structure:

topic1:
{
         "_id": “a12345",
          "userId" : "x123”,
	  “externalId”: “0987654",
          "firstName" : "first name",
          "lastName" : "last name",
          "birthDate" : -157770000000,
          "vipFlag" : "1",
          "email" : "user@domain.org",
          "mobile" : "+12345678",
          "zipCode" : "12345",
          "city" : "City name"
}

topic2:
{
         "_id": “b12345",
          "userId" : "x123”,
	  “externalId”: “0987654",
          "firstName" : "first name",
          "lastName" : "last name",
          "birthDate" : -157770000000,
          "email" : "user@domain.org",
          "mobile" : "+12345678",
          "zipCode" : "12345",
          "city" : "City name"
}

topic3:
{
         "_id": “c12345",
	  “externalId”: “0987654",
          "firstName" : "first name",
          "lastName" : "last name",
          "email" : "user@domain.org",
          "mobile" : "+12345678",
          "zipCode" : "12345",
}

topic4:
{
         "_id": “b12345",
          "userId" : "x123”,
          "firstName" : "first name",
          "lastName" : "last name",
          "birthDate" : -157770000000,
          "email" : "user@domain.org",
          "mobile" : "+12345678",
          "zipCode" : "12345",
          "city" : "City name"
 }

The information is pushed continuously into ES from the topics in kafka.
I’m considering using Kafka Elasticsearch Sink Connector to store the data in separate indexes per topic and merge the values based on the merging rules into a new index that will be used for querying.

Following values should be used for merging the documents from topics:

  • topic1 with topic2: userId
  • topic1 with topic4: userId
  • topic1_2_4 with topic3: externalId

As a result I’m expecting one topic with either parent-child relation or with merged fields (whatever is easier)

Is it doable in elasticsearch or should I write a an external kafka consumer + elasticsearch indexer application to merge and insert the data?
If it's doable in elasticsearch, what would be the right way to solve this?

You can use a continuous transform for this, however you need 2, one that groups by userId and a 2nd one that merges the output of the 1st transform with topic3 by grouping on externalId.

However for a continuous transform all your source indexes require timestamp field, if you don't have one you might add one using an ingest timestamp. You also need this to connect transform 1 with transform 2.

For picking the right values for the output, e.g. email you can use a script via scripted_metric (spoiler: in a future release this will become easier). Have a look at the provided docs, in the sub sections you will find some painless examples, e.g. the one for top_hits.

Thanks for your help @Hendrik_Muhs!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.