Building a small service to archive contact records from Hubspot into ES index and records are consistently not making it into the index, but varying counts each attempt. Using the latest ES Node SDK client and ES is hosted in Elastic Cloud on version 6.4.
1 zone, 4GB memory, highio instance.
Small requests with 250 or less records inserts fine with exact count match, but larger inserts (sequential requests) continually lose records but without errors in logs. Tests with 5K, 7K and 22K records fail to load them all in ES.
Design:
- Hubspot webhook hits REST API endpoint
- REST API (Docker/Express) publishes to PubSub topic
- I confirmed with an in-memory counter that 7,227 requests processed
- Cloud function triggered by PubSub topic then
POST
s the update to ES in Elastic Cloud - Friday even spun up new 3-zone cluster and didn't help (and no errors)
Troubleshooting
I changed the client.update
to client.bulk
and added a script to increment a counter in another index to confirm the requests were occurring. Oddly, of 7,227 attempts, the counter reads 2,906 and the doc count in archive index is only 2,667. No errors in logs when logging the ES response
object.
Cloud Function Code
const elasticsearch = require('elasticsearch');
const moment = require('moment');
const crypto = require('crypto');
// snipped const params ...
const client = new elasticsearch.Client({
host: [{
host: ES_HOST,
auth: `${ES_USER}:${ES_PASS}`,
protocol: ES_PROTOCOL,
port: ES_PORT,
log: ES_LOG_LEVEL
}]
});
/**
* Triggered from a message on a Cloud Pub/Sub topic.
*
* @param {!Object} event The Cloud Functions event.
* @param {!Function} The callback function.
*/
exports.upsertToElasticsearch = (event, callback) => {
// The Cloud Pub/Sub Message object.
const pubsubMessage = event.data;
const messageStr = pubsubMessage.data ? Buffer.from(pubsubMessage.data, 'base64').toString() : null;
const message = messageStr ? JSON.parse(messageStr) : {};
const {request, index, keyFields, doc} = message; // expected message structure
// build traceable ID from request
const LOG_PREFIX = `[${request.id || 'unknown'}]`;
console.log(`${LOG_PREFIX} Processing new ES upsert`);
if (index && doc) {
// set published date if not present
if (!doc.published) {
doc.published = moment.utc().toISOString();
}
// helper function
const _getIdHash = (keyFields, doc) => {
const SEPARATOR = ':';
const fieldVals = [];
// push doc values for key fields to temp array
keyFields.map((key) => fieldVals.push(`${doc[key]}`.trim().toLowerCase()));
// return crypto hash of values delimited with colon
return crypto.createHash('md5').update(fieldVals.join(SEPARATOR)).digest('hex');
}
if (keyFields && keyFields.length > 0 && !doc.id) {
doc.id = _getIdHash(keyFields, doc);
}
console.log(`${LOG_PREFIX} Updating index ${index} with doc ${doc.id}`);
try {
client.bulk({
body: [
{ update: { _index: index, _type: ES_TYPE, _id: doc.id} },
{ doc: doc, doc_as_upsert: true },
{ update: { _index: 'metrics', _type: ES_TYPE, _id: 'archive'} },
{ script: { source: 'ctx._source.count += params.count', lang: 'painless', params: {count: 1} } }
]
}, (error, response) => {
console.log(`${LOG_PREFIX} Database update returned`);
console.log(JSON.stringify(response));
if (error === undefined) {
// do something perhaps
}
callback();
});
} catch (error) {
console.log(`${LOG_PREFIX} Error updating database`);
console.error(error.message);
callback();
}
} else {
console.error(`${LOG_PREFIX} Missing index or doc. Nothing to do`);
callback();
}
};