Detect newly inserted documents in an index

Okay so here is my usecase
I have an index say one with say 200000 documents( these documents are very bulky ) in a source elastic server
Now I am migrating all the data to another server (we'll call it target elastic server)
And everytime I perform some action in my application it gets logged in this index
So the documents keep getting inserted very frequently into this index
So I would have to ensure there is no data loss between the source and the target elastic index which is why I would have to keep doing multiple iterations for migrating this index
And currently I am dropping the index in the target and creating it everytime I run my migration script
But I don't want to do this
I want to just see what documents were inserted newly in the source elastic index and migrate them from the source to target
Also I am not using the @timestamp or any custom field to track the timestamp of inserting the document
I read in the optimistic concurrency control doc that _seq_no is something that can be used to track a document and is unique and is not random like the _id
So is it possible for me to use the _seq_no for this use case

My source elastic server is 7.17 and target is 8.15

Hello! Have you considered using the reindex API with version_type set to external (so that existing docs are updated) and reindexing from a remote?

To do this automatically, you can use cross-cluster replication which requires a license of Platinum or above.

Hey
Can the reindex api be used between two elastic servers?
Here I am trying to migrate from one server to another

Yes, it is supported, it’s called a remote reindex, as I mentioned.

Thank you so much!
I'll try that approach and let you know if it solves the problem

Hey okay so say below is my code using remote reindex

migrateNotifs = await targetElasticClient.reindex({
                      source: {
                        remote: {
                          host: deconstructURL.href,
                          username: deconstructURL.username,
                          password: deconstructURL.password,
                        },
                        index: "notifications",
                      },
                      dest: {
                        index: "notifications",
                      },
                    refresh: true,  // Ensure documents are immediately searchable.
                    wait_for_completion: true,  // Block until the operation completes.
                    slices: 5,  // Optional: Parallelize the process.
                  });

I have followed a structure similar to that in the es docs

But I keep getting this error
backend: =|ConfigurationError: Missing required parameter: body

Below is the exact structure in the doc

const response = await client.reindex({
  source: {
    remote: {
      host: "http://otherhost:9200",
      username: "user",
      password: "pass",
    },
    index: "my-index-000001",
    query: {
      match: {
        test: "data",
      },
    },
  },
  dest: {
    index: "my-new-index-000001",
  },
});

What is wrong in my code?

Hello,

Assuming you're using JavaScript, your code looks correct. I started Elasticsearch with start-local and then used this code:

const { Client } = require('@elastic/elasticsearch')
const client = new Client({
  node: 'http://localhost:9200/',
  auth: {
    apiKey: '...'
  }
})

async function main() {
  r = await client.info()
  console.log(r)

  const response = await client.reindex({
    source: {
      index: "my-index",
    },
    dest: {
      index: "my-dest-index",
    },
    refresh: true,
    wait_for_completion: true,
    slices: 5
  });
  console.log(response);
}

main()

The above prints:

{
  took: 17,
  timed_out: false,
  total: 1,
  updated: 1,
  created: 0,
  deleted: 0,
  batches: 1,
  version_conflicts: 0,
  noops: 0,
  retries: { bulk: 0, search: 0 },
  throttled_millis: 0,
  requests_per_second: -1,
  throttled_until_millis: 0,
  failures: []
}

I did not manage to make version_type: external work yet, however. And I'm not using remote reindex as I only tested with a single cluster, but your error does not seem related to that at all.

Note that you can also try in Kibana directly, and that all examples in the docs can be displayed as Kibana request but also in JavaScript and Python.

Yeah it works if I use the reindex api within the same cluster.

I will be initiating the migration process on the click of a button in my ui which sends a request to the backend where I have my migration script so I don't think kibana can really help me out here

Here is how I am handling it currently

let indexExistsinTarget = await targetElasticClient.indices.exists({index: "notifications"});
            let migrateNotifs = false;
            if(indexExistsinTarget.body){
                              await targetElasticClient.indices.delete({index: "notifications"});
            }
            const {body: sourceMappingNotif} = await sourceElasticClient.indices.getMapping({index: "notifications"});
            let targetBodyNotif = {
                "settings":{
                    "index":{
                        "blocks": {
                            "read_only_allow_delete": "false"
                          }
                    }
                },
                "mappings": sourceMappingNotif["notifications"].mappings
            }
            await targetElasticClient.indices.create({
                index: "notifications",
                body: targetBodyNotif
            }) 
        
            migrateNotifs = await reindexIndexDatainBatches(sourceElasticClient,targetElasticClient,"notifications","notifications",null);


export async function reindexIndexDatainBatches(sourceElasticClient,targetElasticClient,sourceIndex,targetIndex,csvType){
    let scrollId = null;
    let moreRecordsExist = true;
    const batchSize = sourceIndex==="activity-log"?2000:9900;
    let totalIndexed = 0;
    let failedDocuments = [];

    while(moreRecordsExist){
        let body;
        if(!scrollId){
            console.log(`Starting search for index ${sourceIndex} `);
            const searchParams = {
            index: sourceIndex,
            scroll: '20m',
            size: batchSize
            };
            const response = await sourceElasticClient.search(searchParams);
            body = response.body;
            console.log("Initial search successful",body.hits.hits.length);
        }else{
            console.log(`Continuing scroll for index ${sourceIndex}`);
            const scrollResponse = await sourceElasticClient.scroll({
                scroll_id: scrollId,
                scroll: '20m'
            });
            body = scrollResponse.body;
            console.log(`Next batch retrieved ${body.hits.hits.length}`);
        }

        scrollId = body._scroll_id;
        const hits = body.hits.hits;

        if(hits.length === 0){
            console.log("No more records to scroll");
            moreRecordsExist = false;
            break;
        }

        const bulkBody = hits.flatMap(doc => {
            const baseDoc = {index: {_index:targetIndex,_id:doc._id}};
            const docSource = {...doc._source};
            if(docSource.relation_type){
                if(docSource.relation_type.parent){
                    baseDoc.index.routing = docSource.relation_type.parent;
                }
                else{
                    baseDoc.index.routing = doc._id;
                }
            }
            if(csvType){
                docSource.csv_type = csvType;
            }
            return [baseDoc,docSource];
        });    
        
        try{
            const bulkWriteResponse = await targetElasticClient.bulk({body: bulkBody,refresh: true});
            if(bulkWriteResponse.body.errors){
                bulkWriteResponse.body.items.forEach((item,idx)=>{
                    if(item.index && item.index.error){
                        console.error(`Error indexing document ${bulkBody[idx * 2].index._id}: ${JSON.stringify(item.index.error)}`);
                    }
                    return false;
                 })
            }

        }catch(err){
            if(err.meta && err.meta.statusCode === 413){
                console.log("Payload too large, storing it to push it in later");
                bulkBody.forEach((_,idx)=>{
                    if(idx%2===0){
                        failedDocuments.push({
                            metadata: bulkBody[idx],
                            source: bulkBody[idx+1]
                        });
                    }
                });
            }else{
                console.log("Error Indexing documents",err);
                return err;
            }
        }       
 
        totalIndexed  = totalIndexed + hits.length;
        console.log("TOTAL INDEXED: " + totalIndexed);
    };


    if(failedDocuments.length>0){
        console.log("Retrying for failed documents.Number of documents: " + failedDocuments.length);
        for(let i=0; i<failedDocuments.length;i=i+30){
            console.log("============== BATCH ==============",i);
            const batch = failedDocuments.slice(i, i+30);
            const bulkRetryBody = batch.flatMap(doc => [
                doc.metadata,
                doc.source
            ]);

            try{
                const retryResponse = await targetElasticClient.bulk({
                    body: bulkRetryBody,
                    refresh: true
                });
                if(retryResponse.body.errors){
                    retryResponse.body.items.forEach((item,idx)=>{
                        if(item.index && item.index.error){
                            console.error(`Error indexing document ${bulkRetryBody[idx * 2].index._id}: ${JSON.stringify(item.index.error)}`);
                        }
                        return false;
                     })
                }
            }catch(retryErr){
                console.log(`Error retrying documents in batch : ${retryErr.message}`);
                return false;
            }
        }

    }

    return true;
}


Right, I was only suggesting to avoid errors like the one below that cannot happen in Kibana:

Regarding your new code snippet, yes this reimplements the reindex API (which also uses scrolls under the hood). It's more work and can be less efficient if the code runs far away from the two clusters. But this works too. :+1: