Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ jobs:
override-ci-command: npm install
- run:
name: Audit Dependencies
command: npm audit --production --audit-level=high
command: npm run audit
- run:
name: Running Mocha Tests
command: npm test
Expand Down
15 changes: 10 additions & 5 deletions .nsprc
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
{
"GHSA-27h2-hvpr-p74q": {
"active": true,
"notes": "We don't use verify function from jsonwebtoken, so not affected"
}
}
"GHSA-fjxv-7rqg-78g4": {
"active": true
},
"GHSA-jr5f-v2jv-69x6": {
"active": true
},
"GHSA-4hjh-wcwx-xvwj": {
"active": true
}
}
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## 2.9.0 (October 10, 2025)
* **Major Improvement** to `Get Updated Objects Polling` trigger:
* Re-architected the polling mechanism to use keyset pagination (`LastModifiedDate` and `Id`).
* **Fixed a critical bug** that caused silent data loss when a large number of records had the identical `LastModifiedDate`.
* Fixed the underlying cause of a potential infinite loop when a full page of records shared the same timestamp.
* Updated and improved the `README.md` documentation for clarity, consistency, and accuracy, including:
* Adding a `Required Permissions` section for the polling trigger.
* Removing obsolete limitations that were resolved by the bug fixes.
* General language and formatting enhancements.

## 2.8.6 (January 31, 2025)
* Upgrade Sailor version to 2.7.4
* Enhanced error message text in the `Raw Request` action
Expand Down
529 changes: 202 additions & 327 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion component.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"description": "Customer relationship management (CRM) software & cloud computing from the leader in CRM solutions for businesses large & small.",
"docsUrl": "https://github.com/elasticio/salesforce-component-v2",
"url": "https://www.salesforce.com/",
"version": "2.8.6",
"version": "2.9.0-dev.5",
"authClientTypes": [
"oauth2"
],
Expand Down
8 changes: 6 additions & 2 deletions lib/salesForceClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ class SalesForceClient {
return this.getSObjectList('event sobject', (object) => object.name.endsWith('__e'));
}

async simpleQuery(query) {
return this.connection.query(query);
}

async queryEmitAll(query) {
const result = [];
await new Promise((resolve, reject) => {
Expand Down Expand Up @@ -214,7 +218,7 @@ class SalesForceClient {
async pollingSelectQuery(options) {
const sobject = options.sobject || this.configuration.sobject;
const {
selectedObjects, linkedObjects, whereCondition, maxFetch,
selectedObjects, linkedObjects, whereCondition, maxFetch, orderBy,
} = options;
let query = this.connection.sobject(sobject)
.select(selectedObjects);
Expand All @@ -229,7 +233,7 @@ class SalesForceClient {
return newQuery;
}, query);
return query.where(whereCondition)
.sort({ LastModifiedDate: 1 })
.sort(orderBy || { LastModifiedDate: 1 })
.execute({ autoFetch: true, maxFetch });
}

Expand Down
90 changes: 67 additions & 23 deletions lib/triggers/getUpdatedObjectsPolling.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,45 @@ function getSelectedFields(cfg) {
if (!selectedFields.includes('LastModifiedDate')) {
selectedFields.push('LastModifiedDate');
}
if (!selectedFields.includes('Id')) {
selectedFields.push('Id');
}
return selectedFields.toString();
}

exports.process = async function processTrigger(_msg, cfg, snapshot) {
function buildWhereClause(from, to, lastModificationTime, lastSeenId) {
const whereParts = [];
if (lastModificationTime) {
const lastDate = timeToString(lastModificationTime);
if (lastSeenId) {
whereParts.push(`(LastModifiedDate = ${lastDate} AND Id > '${lastSeenId}')`);
whereParts.push(`(LastModifiedDate > ${lastDate})`);
} else {
whereParts.push(`LastModifiedDate >= ${lastDate}`);
}
} else {
whereParts.push(`LastModifiedDate >= ${timeToString(from)}`);
}
whereParts.push(`LastModifiedDate < ${timeToString(to)}`);

if (whereParts.length > 2) {
return `((${whereParts.slice(0, 2).join(' OR ')}) AND ${whereParts[2]})`;
}
return whereParts.join(' AND ');
}

exports.process = async function processTrigger(_msg, cfg, snapshot = {}) {
this.logger.info('Start processing "Get Updated Objects Polling" trigger');
const currentTime = new Date();

// eslint-disable-next-line prefer-const
let { lastModificationTime, lastSeenId } = snapshot;

if (snapshot.nextStartTime) {
this.logger.warn('Found a snapshot with the old format (nextStartTime), converting to the new format.');
lastModificationTime = snapshot.nextStartTime;
}

const {
sobject,
linkedObjects = [],
Expand All @@ -36,15 +68,15 @@ exports.process = async function processTrigger(_msg, cfg, snapshot) {

let { startTime, endTime, pageSize } = cfg;
if (!pageSize) pageSize = MAX_FETCH;
if (!startTime) startTime = 0;
if (!startTime) startTime = '1970-01-01T00:00:00.000Z';
if (!endTime) endTime = currentTime;
if (!isDateValid(startTime)) throw new Error('invalid "Start Time" date format, use ISO 8601 Date time utc format - YYYY-MM-DDThh:mm:ssZ');
if (!isDateValid(endTime)) throw new Error('invalid "End Time" date format, use ISO 8601 Date time utc format - YYYY-MM-DDThh:mm:ssZ');
if (pageSize > MAX_FETCH || isNumberNaN(pageSize) || Number(pageSize) < 0) throw new Error(`"Size of Polling Page" must be valid number between 0 and ${MAX_FETCH}`);
pageSize = Number(pageSize);
let from = snapshot?.nextStartTime || startTime;

const from = lastModificationTime || startTime;
const to = endTime || currentTime;
let nextStartTime = currentTime;

if (timestamp(from) > timestamp(to)) {
this.logger.info('"Start Time" is higher than "End Time", finishing trigger process');
Expand All @@ -54,12 +86,18 @@ exports.process = async function processTrigger(_msg, cfg, snapshot) {

this.logger.info(`Filter "${sobject}" updated from ${timeToString(from)} to ${timeToString(to)}, page limit ${pageSize}`);

const preliminaryWhere = buildWhereClause(from, to, lastModificationTime, lastSeenId);
const countQuery = `SELECT COUNT() FROM ${sobject} WHERE ${preliminaryWhere}`;
const countResult = await callJSForceMethod.call(this, cfg, 'simpleQuery', countQuery);
this.logger.info(`Found ${countResult.totalSize} objects to process for this run.`);

const selectedFields = getSelectedFields(cfg);
const options = {
sobject,
selectedObjects: linkedObjects.reduce((query, obj) => (obj.startsWith('!') ? query : `${query}, ${obj}.*`), selectedFields),
linkedObjects,
maxFetch: pageSize,
orderBy: 'LastModifiedDate, Id',
};

if (isDebugFlow) {
Expand All @@ -69,27 +107,27 @@ exports.process = async function processTrigger(_msg, cfg, snapshot) {
}

let proceed = true;
let emitted;
let emitted = false;
let iteration = 1;
let results;
let nextLastModificationTime = lastModificationTime;
let nextLastSeenId = lastSeenId;

try {
do {
options.whereCondition = `LastModifiedDate >= ${timeToString(from)} AND LastModifiedDate < ${timeToString(to)}`;
options.whereCondition = buildWhereClause(from, to, nextLastModificationTime, nextLastSeenId);

this.logger.debug('Start poll object with options: %j', options);
results = await callJSForceMethod.call(this, cfg, 'pollingSelectQuery', options);
this.logger.info(`Polling iteration ${iteration} - ${results.length} results found`);
iteration++;
if (results.length !== 0) {

if (results.length > 0) {
emitted = true;
nextStartTime = results[results.length - 1].LastModifiedDate;
if (results.length === pageSize) {
this.logger.warn('All entries that have the same LastModifiedDate as the last entry will be deleted from the resulting array to prevent emitting duplicates');
results = results.filter((item) => item.LastModifiedDate !== nextStartTime);
this.logger.warn('Entries that have the same LastModifiedDate as the last entry deleted. Current size of the resulting array is %s', results.length);
} else {
nextStartTime = timeToString(timestamp(nextStartTime) + 1000);
proceed = false;
}
const lastRecord = results[results.length - 1];
nextLastModificationTime = lastRecord.LastModifiedDate;
nextLastSeenId = lastRecord.Id;

if (emitBehavior === 'fetchPage') {
this.logger.debug('Emit Behavior set as Fetch Page, going to emit one page...');
await this.emit('data', messages.newMessageWithBody({ results }));
Expand All @@ -99,15 +137,23 @@ exports.process = async function processTrigger(_msg, cfg, snapshot) {
await this.emit('data', messages.newMessageWithBody(record));
}
}
if (singlePagePerInterval) proceed = false;
from = nextStartTime;

if (results.length < pageSize) {
proceed = false;
}

if (singlePagePerInterval) {
proceed = false;
}
} else {
proceed = false;
}
} while (proceed);

this.logger.info('Processing Polling trigger finished successfully');
this.logger.debug('Going to emit snapshot: %j', { nextStartTime });
await this.emit('snapshot', { nextStartTime });
const newSnapshot = { lastModificationTime: nextLastModificationTime || from, lastSeenId: nextLastSeenId };
this.logger.debug('Going to emit snapshot: %j', newSnapshot);
await this.emit('snapshot', newSnapshot);
} catch (e) {
if (e.statusCode) {
throw new Error(`Got error - ${e.name}\n message: \n${e.message}\n statusCode: \n${e.statusCode}\n body: \n${JSON.stringify(e.body)}`);
Expand All @@ -116,9 +162,7 @@ exports.process = async function processTrigger(_msg, cfg, snapshot) {
}

if (isDebugFlow && !emitted) {
throw new Error(`No object found. Execution stopped.
This error is only applicable to the Retrieve Sample.
In flow executions there will be no error, just an execution skip.`);
throw new Error('No object found. Execution stopped.\n This error is only applicable to the Retrieve Sample.\n In flow executions there will be no error, just an execution skip.');
}
};

Expand Down
Loading