Records missing during ES updates with over 250 requests

Building a small service to archive contact records from Hubspot into ES index and records are consistently not making it into the index, but varying counts each attempt. Using the latest ES Node SDK client and ES is hosted in Elastic Cloud on version 6.4.

1 zone, 4GB memory, highio instance.

Small requests with 250 or less records inserts fine with exact count match, but larger inserts (sequential requests) continually lose records but without errors in logs. Tests with 5K, 7K and 22K records fail to load them all in ES.

Design:

  • Hubspot webhook hits REST API endpoint
  • REST API (Docker/Express) publishes to PubSub topic
    • I confirmed with an in-memory counter that 7,227 requests processed
  • Cloud function triggered by PubSub topic then POSTs the update to ES in Elastic Cloud
  • Friday even spun up new 3-zone cluster and didn't help (and no errors)

Troubleshooting

I changed the client.update to client.bulk and added a script to increment a counter in another index to confirm the requests were occurring. Oddly, of 7,227 attempts, the counter reads 2,906 and the doc count in archive index is only 2,667. No errors in logs when logging the ES response object.

Cloud Function Code

const elasticsearch = require('elasticsearch');
const moment = require('moment');
const crypto = require('crypto');

// snipped const params ...

const client = new elasticsearch.Client({
  host: [{
    host: ES_HOST,
    auth: `${ES_USER}:${ES_PASS}`,
    protocol: ES_PROTOCOL,
    port: ES_PORT,
    log: ES_LOG_LEVEL
  }]
});


/**
 * Triggered from a message on a Cloud Pub/Sub topic.
 *
 * @param {!Object} event The Cloud Functions event.
 * @param {!Function} The callback function.
 */
exports.upsertToElasticsearch = (event, callback) => {
  // The Cloud Pub/Sub Message object.
  const pubsubMessage = event.data;
  const messageStr = pubsubMessage.data ? Buffer.from(pubsubMessage.data, 'base64').toString() : null;
  const message = messageStr ? JSON.parse(messageStr) : {};
  const {request, index, keyFields, doc} = message; // expected message structure

  // build traceable ID from request
  const LOG_PREFIX = `[${request.id || 'unknown'}]`;
  console.log(`${LOG_PREFIX} Processing new ES upsert`);

  if (index && doc) {
    // set published date if not present
    if (!doc.published) {
      doc.published = moment.utc().toISOString();
    }

    // helper function
    const _getIdHash = (keyFields, doc) => {
      const SEPARATOR = ':';
      const fieldVals = [];
      // push doc values for key fields to temp array
      keyFields.map((key) => fieldVals.push(`${doc[key]}`.trim().toLowerCase()));
      // return crypto hash of values delimited with colon
      return crypto.createHash('md5').update(fieldVals.join(SEPARATOR)).digest('hex');
    }

    if (keyFields && keyFields.length > 0 && !doc.id) {
      doc.id = _getIdHash(keyFields, doc);
    }

    console.log(`${LOG_PREFIX} Updating index ${index} with doc ${doc.id}`);
    try {
      client.bulk({
        body: [
          { update: { _index: index, _type: ES_TYPE, _id: doc.id} },
          { doc: doc, doc_as_upsert: true },
          { update: { _index: 'metrics', _type: ES_TYPE, _id: 'archive'} },
          { script: { source: 'ctx._source.count += params.count', lang: 'painless', params: {count: 1} } }
        ]
      }, (error, response) => {
        console.log(`${LOG_PREFIX} Database update returned`);
        console.log(JSON.stringify(response));

        if (error === undefined) {
          // do something perhaps
        }

        callback();
      });
    } catch (error) {
      console.log(`${LOG_PREFIX} Error updating database`);
      console.error(error.message);
      callback();
    }
  } else {
    console.error(`${LOG_PREFIX} Missing index or doc. Nothing to do`);
    callback();
  }
};

How are you getting the document count after indexing? Are you manually issuing a refresh request or waiting long enough to ensure one has occurred before getting the document count?

When bulk indexing, are you checking the response to verify that all documents were successful?

After indexing, I perform a GET /{name}/_search and view hit count

Also try using GET /_cat/indices

It’s strange because I’ve pushed over 100M docs/day with my own 3-node cluster without issue, just more RAM. Seems cloud instance rate limiting or dropping requests for even 30 ops/sec load.

You need to run a refresh call before you get the document count.

What size cluster are you indexing into?

1 node, 1 zone, 4GB, high-io classic. Only 45% memory pressure and plenty of disk. Admin console wont let me change instances count for some reason. It says can add up to 5, but input box disabled. Last Friday I spun up new cluster with dedicated master and 3 zones and same result.

I ran POST /hs-contacts/_refresh and no change to count.

{
  "_shards": {
    "total": 3,
    "successful": 3,
    "failed": 0
  }
}

Oddly when I run a GET /hs-contacts/_search I get:

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 3,
    "successful": 3,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 2667,
    "max_score": 1,
    ...

And when I try a _cat/indices it displays 5,334 docs, but still not matching the 7K attempts. I have 0 replicas on that index.

green  open hs-contacts                   aHx4-qPCT0ijbyQGsNfZLw 3 0   5334    0  15.2mb  15.2mb

Nodes on Elastic Cloud get CPU allocated proportional to their size, so if you look at monitoring I suspect that you will see that you are hitting CPU limits as indexing can be CPU intensive.

When you are using the bulk interface you need to check in the response whether all documents were successful and retry ones that were not. If you are not doing so, this could explain the difference in numbers.

Thanks for insights. I'll keep testing with more resources, but that wasn't working on Friday when I tried as well. Any idea why I cannot increase number of nodes from admin console?

49

But the input box to increase number of nodes (instances?) is disabled and grayed out?

This is monitoring

On Elastic Cloud you always scale up by increasing node size until you reach the largest size 58GB/64GB. This is because it is more efficient to do so than to run lots of small nodes as there is less overhead. From that point on you scale out and nodes.

OMG, that was not intuitive at all. Please suggest to product team to add tooltip or popup on that disabled instances input field to explain that you must first increase to 64GB to add a second instance.

So is the suggestion instead of multiple nodes to just add another zone for redundancy? I cannot justify the expense of a 64GB node, just to be able to add another node, to have HA.

You can add availability zones at any size, which will give you up to 3 nodes. This will give added resiliency but also potentially better performance.

Thanks for info. I've always managed my own clusters since Shay began, but love the concept of Cloud and one-click upgrades. A few things are not intuitive on that Edit screen so please nudge team. :wink:

One silly question (I hope) but if I upgrade cluster to run this batch job, can I downgrade it thereafter or once you upgrade can you only go up? (Google Cloud SQL does this, and once you increase disk, you can never decrease)

You can increase and decrease size of nodes and also change the number of availability zones when required. You do need to check that you have not got more data than a smaller cluster can fit though if you are reducing size.

1 Like

I just upgraded to a 16GB 2-zone cluster and it added a 1GB "tiebreaker" node. 33GB RAM and same issue, not even able to index 7,128 records.

Only 4,815 docs in index after _refresh

Incremented counter reveals 6,176

Relevant code

Using Elastic NPM module and performing a client.bulk to perform the record upsert, and increment the count in another metrics index, and neither match and missing thousands of docs.

      client.bulk({
        body: [
          { update: { _index: index, _type: ES_TYPE, _id: doc.id} },
          { doc: doc, doc_as_upsert: true },
          { update: { _index: 'metrics', _type: ES_TYPE, _id: 'archive'} },
          { script: { source: 'ctx._source.count += params.count', lang: 'painless', params: {count: 1} } }
        ]
      }, (error, response) => {
        console.log(`${LOG_PREFIX} Database update returned`);
        console.log(JSON.stringify(response));
        // TODO: consider raising error in callback to trigger retry (depending on error code)

        if (error === undefined) {
          // do something perhaps
        }

        callback();
      });

I've had smaller cluster handle millions of docs and no issues so I find it hard to believe ES in cloud cannot handle 7K and reliably store them. I've tried just client.update for the hs-contacts index, and then when troubleshooting added the client.bulk to see if some discrepancy to rule out the cloud function.

Here is cluster size/info...

Here are the performance metrics

I will scour logs to try and find errors but it's puzzling.

I created a ticket with support #00258474

Also zoomed in on latest test in performance screen and doesn't seem resource constraint, and the requests shows over 7K requests so I don't think it's a loss between PubSub and cloud functions, it's something with ES cloud instances.

Logs filter was wrong and found version conflict errors:

{"took":1652,"errors":true,"items":[{"update":{"_index":"hs-contacts","_type":"_doc","_id":"61374f7e9fc502d796109ac4dd3a7f8a","_version":1,"result":"created","_shards":{"total":1,"successful":1,"failed":0},"_seq_no":1215,"_primary_term":1,"status":201}},{"update":{"_index":"metrics","_type":"_doc","_id":"archive","status":409,"error":{"type":"version_conflict_engine_exception","reason":"[_doc][archive]: version conflict, current version [9092] is different than the one provided [9091]","index_uuid":"2SxRhJtzTc2bQW9mH190DQ","shard":"0","index":"metrics"}}}]}

This appears just async issue with the incremental counter. I will remove the client.bulk and revert back to my client.update attempt and report. I believe this is independent of indexing the docs in hs-contacts index, however, so still doesn't explain why those are failing.