Skip to content

Commit 67098eb

Browse files
authored
Fb/discover cds events (#412)
* discover cds events from env * discover cds events from env * fixes and test improvements * fixes and test improvements * fixes and test improvements * wip * add tests and fixes for delayed events * add tests and fixes for delayed events * wip * wip * wip * wip * wip * wip * wip * wip * fix
1 parent 9bc5fc4 commit 67098eb

25 files changed

+532
-373
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
77

88
## v2.0.2 - 2025-11-XX
99

10+
### Added
11+
12+
- [CAP Queue] Configuration is initialized by CDS bootstrap instead of lazy initialization on first use.
13+
1014
### Fixed
1115

1216
- [Admin Service] Correctly return open locks after adding namespace to locks
1317
- [Admin Service] Publish events improved error handling
18+
- [Event Processing] In some cases where more delayed events than the `selectMaxChunkSize` existed, the processing of open events was delayed.
1419

1520
## v2.0.1 - 2025-11-24
1621

package-lock.json

Lines changed: 9 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,11 @@
5656
"devDependencies": {
5757
"@actions/core": "^1.11.1",
5858
"@cap-js/cds-test": "^0.4.0",
59+
"@cap-js/db-service": "^2.6.0",
5960
"@cap-js/hana": "^2.3.4",
60-
"@cap-js/sqlite": "^2.0.4",
61+
"@cap-js/sqlite": "^2.1.0",
6162
"@opentelemetry/api": "^1.9.0",
62-
"@sap/cds": "^9.4.4",
63+
"@sap/cds": "^9.4.5",
6364
"@sap/cds-dk": "^9.4.2",
6465
"eslint": "^8.57.0",
6566
"eslint-config-prettier": "^9.1.0",

src/EventQueueError.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,16 @@ const ERROR_CODES = {
2727
APP_NAMES_FORMAT: "APP_NAMES_FORMAT",
2828
APP_INSTANCES_FORMAT: "APP_INSTANCES_FORMAT",
2929
MULTI_INSTANCE_PROCESSING_NOT_ALLOWED: "MULTI_INSTANCE_PROCESSING_NOT_ALLOWED",
30+
INVALID_CLUSTER_HANDLER_RESULT: "INVALID_CLUSTER_HANDLER_RESULT",
3031
};
3132

3233
const ERROR_CODES_META = {
3334
[ERROR_CODES.WRONG_TX_USAGE]: {
3435
message: "Usage of this.tx|this.context is not allowed if parallel event processing is enabled",
3536
},
37+
[ERROR_CODES.INVALID_CLUSTER_HANDLER_RESULT]: {
38+
message: "A cluster handler returned an invalid result. Cluster handlers must return the clustered payload data.",
39+
},
3640
[ERROR_CODES.UNKNOWN_EVENT_TYPE]: {
3741
message: "The event type and subType configuration is not configured! Maintain the combination in the config file.",
3842
},
@@ -371,6 +375,17 @@ class EventQueueError extends VError {
371375
);
372376
}
373377

378+
static invalidClusterHandlerResult(clusterKey, propertyName) {
379+
const { message } = ERROR_CODES_META[ERROR_CODES.INVALID_CLUSTER_HANDLER_RESULT];
380+
return new EventQueueError(
381+
{
382+
name: ERROR_CODES.INVALID_CLUSTER_HANDLER_RESULT,
383+
info: { clusterKey, propertyName },
384+
},
385+
message
386+
);
387+
}
388+
374389
static isRedisConnectionFailure(err) {
375390
return err instanceof VError && err.name === ERROR_CODES.REDIS_CREATE_CLIENT;
376391
}

src/EventQueueProcessorBase.js

Lines changed: 55 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class EventQueueProcessorBase {
6262
this.#eventType = eventType;
6363
this.#eventSubType = eventSubType;
6464
this.#eventConfig = config ?? {};
65+
this.#eventConfig.selectedDelayedEventIds ??= [];
6566
this.__parallelEventProcessing = this.#eventConfig.parallelEventProcessing ?? DEFAULT_PARALLEL_EVENT_PROCESSING;
6667
if (this.__parallelEventProcessing > LIMIT_PARALLEL_EVENT_PROCESSING) {
6768
this.__parallelEventProcessing = LIMIT_PARALLEL_EVENT_PROCESSING;
@@ -664,45 +665,49 @@ class EventQueueProcessorBase {
664665
const baseDate = Date.now();
665666
const refDateStartAfter = new Date(baseDate + this.#config.runInterval * 1.2);
666667
await executeInNewTransaction(this.__baseContext, "eventQueue-getQueueEntriesAndSetToInProgress", async (tx) => {
667-
const entries = await tx.run(
668-
SELECT.from(this.#config.tableNameEventQueue)
669-
.forUpdate({ wait: this.#config.forUpdateTimeout })
670-
.limit(this.selectMaxChunkSize)
671-
.where(
672-
"type =",
673-
this.#eventType,
674-
"AND subType=",
675-
this.#eventSubType,
676-
"AND namespace =",
677-
this.#namespace,
678-
"AND ( startAfter IS NULL OR startAfter <=",
679-
refDateStartAfter.toISOString(),
680-
" ) AND ( status =",
681-
EventProcessingStatus.Open,
682-
"AND ( lastAttemptTimestamp <=",
683-
this.startTime.toISOString(),
684-
...(this.isPeriodicEvent
685-
? [
686-
"OR lastAttemptTimestamp IS NULL ) OR ( status =",
687-
EventProcessingStatus.InProgress,
688-
"AND lastAttemptTimestamp <=",
689-
new Date(baseDate - this.#eventConfig.keepAliveMaxInProgressTime * 1000).toISOString(),
690-
") )",
691-
]
692-
: [
693-
"OR lastAttemptTimestamp IS NULL ) OR ( status =",
694-
EventProcessingStatus.Error,
695-
"AND lastAttemptTimestamp <=",
696-
this.startTime.toISOString(),
697-
") OR ( status =",
698-
EventProcessingStatus.InProgress,
699-
"AND lastAttemptTimestamp <=",
700-
new Date(baseDate - this.#eventConfig.keepAliveMaxInProgressTime * 1000).toISOString(),
701-
") )",
702-
])
703-
)
704-
.orderBy("createdAt", "ID")
705-
);
668+
const cqn = SELECT.from(this.#config.tableNameEventQueue)
669+
.forUpdate({ wait: this.#config.forUpdateTimeout })
670+
.limit(this.selectMaxChunkSize)
671+
.where(
672+
"type =",
673+
this.#eventType,
674+
"AND subType=",
675+
this.#eventSubType,
676+
"AND namespace =",
677+
this.#namespace,
678+
"AND ( startAfter IS NULL OR startAfter <=",
679+
refDateStartAfter.toISOString(),
680+
" ) AND ( status =",
681+
EventProcessingStatus.Open,
682+
"AND ( lastAttemptTimestamp <=",
683+
this.startTime.toISOString(),
684+
...(this.isPeriodicEvent
685+
? [
686+
"OR lastAttemptTimestamp IS NULL ) OR ( status =",
687+
EventProcessingStatus.InProgress,
688+
"AND lastAttemptTimestamp <=",
689+
new Date(baseDate - this.#eventConfig.keepAliveMaxInProgressTime * 1000).toISOString(),
690+
") ) ",
691+
]
692+
: [
693+
"OR lastAttemptTimestamp IS NULL ) OR ( status =",
694+
EventProcessingStatus.Error,
695+
"AND lastAttemptTimestamp <=",
696+
this.startTime.toISOString(),
697+
") OR ( status =",
698+
EventProcessingStatus.InProgress,
699+
"AND lastAttemptTimestamp <=",
700+
new Date(baseDate - this.#eventConfig.keepAliveMaxInProgressTime * 1000).toISOString(),
701+
") )",
702+
])
703+
)
704+
.orderBy("createdAt", "ID");
705+
706+
if (this.#eventConfig.selectedDelayedEventIds) {
707+
cqn.where("ID NOT IN", this.#eventConfig.selectedDelayedEventIds);
708+
}
709+
710+
const entries = await tx.run(cqn);
706711

707712
if (!entries.length) {
708713
this.logger.debug("no entries available for processing", {
@@ -756,7 +761,9 @@ class EventQueueProcessorBase {
756761
}
757762

758763
if (!eventsForProcessing.length) {
759-
this.__emptyChunkSelected = true;
764+
if (!entries.length) {
765+
this.__emptyChunkSelected = true;
766+
}
760767
return;
761768
}
762769

@@ -806,8 +813,11 @@ class EventQueueProcessorBase {
806813
return entry.lastAttemptsTs;
807814
}
808815

809-
#handleDelayedEvents(delayedEvents) {
816+
#handleDelayedEvents(delayedEvents, { skipExcludeDelayedEventIds = false } = {}) {
810817
for (const delayedEvent of delayedEvents) {
818+
if (!skipExcludeDelayedEventIds) {
819+
this.#eventConfig.selectedDelayedEventIds.push(delayedEvent.ID);
820+
}
811821
this.#eventSchedulerInstance.scheduleEvent(
812822
this.__context.tenant,
813823
this.#eventType,
@@ -1107,6 +1117,7 @@ class EventQueueProcessorBase {
11071117
}
11081118

11091119
const newEvent = {
1120+
ID: cds.utils.uuid(),
11101121
type: this.#eventType,
11111122
subType: this.#eventSubType,
11121123
namespace: this.#eventConfig.namespace,
@@ -1131,16 +1142,16 @@ class EventQueueProcessorBase {
11311142
});
11321143
}
11331144

1134-
this.tx._skipEventQueueBroadcase = true;
1145+
this.tx._skipEventQueueBroadcast = true;
11351146
await this.tx.run(
11361147
INSERT.into(this.#config.tableNameEventQueue).entries({
11371148
...newEvent,
11381149
startAfter: newEvent.startAfter.toISOString(),
11391150
})
11401151
);
1141-
this.tx._skipEventQueueBroadcase = false;
1152+
this.tx._skipEventQueueBroadcast = false;
11421153
if (intervalInMs < this.#config.runInterval * 1.5) {
1143-
this.#handleDelayedEvents([newEvent]);
1154+
this.#handleDelayedEvents([newEvent], { skipExcludeDelayedEventIds: true });
11441155
const { relative: relativeAfterSchedule } = this.#eventSchedulerInstance.calculateOffset(
11451156
this.#eventType,
11461157
this.#eventSubType,

0 commit comments

Comments
 (0)