Skip to content

Commit 487da95

Browse files
committed
refactor and add tests
1 parent a37a1cd commit 487da95

File tree

13 files changed

+448
-134
lines changed

13 files changed

+448
-134
lines changed

docs/setup/index.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ already provides a robust foundation.
5050

5151
```json
5252
{
53-
"cds": { "eventQueue": { "useAsCAPOutbox": true } }
53+
"cds": { "eventQueue": { "useAsCAPQueue": true } }
5454
}
5555
```
5656

@@ -62,7 +62,7 @@ such as the configuration file path, event processing behavior, load balancing,
6262
The table includes the parameter name, a description of its purpose, and the default value if not specified.
6363

6464
| Name | Description | Default | Can be changed at runtime |
65-
| :----------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | :------------- | :------------------------ |
65+
|:-------------------------------------| :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | :------------- | :------------------------ |
6666
| configFilePath | Path to the configuration file. | null | no |
6767
| events | Options to allow events in the configuration. E.g. via cds-env. | {} | no |
6868
| periodicEvents | Options to allow periodicEvents in the configuration. E.g. via cds-env | {} | no |
@@ -72,7 +72,7 @@ The table includes the parameter name, a description of its purpose, and the def
7272
| runInterval [ms] | The interval in milliseconds at which the runner runs. | 25 _ 60 _ 1000 | yes |
7373
| updatePeriodicEvents | Whether or not to update periodic events. | true | no |
7474
| thresholdLoggingEventProcessing [ms] | Threshold after how many milliseconds the processing of a event or periodic event is logged for observability. | 50 | yes |
75-
| useAsCAPOutbox | Uses the event-queue as the [outbox](https://cap.cloud.sap/docs/node.js/outbox) of CAP. Outbox calls are stored and processed in the event-queue instead of the outbox of CAP. | false | no |
75+
| useAsCAPQueue/useAsCAPOutbox | Uses the event-queue as the [outbox](https://cap.cloud.sap/docs/node.js/outbox) of CAP. Outbox calls are stored and processed in the event-queue instead of the outbox of CAP. | false | no |
7676
| userId | User id for all created cds contexts. This influences the value for updated managed database fields like createdBy and modifiedBy. | false | yes |
7777
| cleanupLocksAndEventsForDev | Deletes all semantic locks and sets all events that are in progress to error during server start. This is used to clean up leftovers from server crashes or restarts during processing. | true | no |
7878
| insertEventsBeforeCommit | If enabled, this feature allows events (including those for outboxed services) to be inserted in bulk using the before commit handler. This is performed to improve performance by mass inserting events instead of single insert operations. This can be disabled by the parameter `skipInsertEventsBeforeCommit` in the function publishEvent. | true | yes |

docs/use-as-cap-outbox/index.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ transaction modes, load balancing, and others, with outboxed CDS services.
2424

2525
## How to enable the event-queue as outbox mechanism for CAP
2626

27-
The initialization parameter `useAsCAPOutbox` enables the event-queue to act as a CAP outbox. To set this parameter,
27+
The initialization parameter `useAsCAPQueue` enables the event-queue to act as a CAP outbox. To set this parameter,
2828
refer to the [setup](/event-queue/setup/#initialization-parameters) part of the documentation. This is the only
2929
configuration needed to enable the event-queue as a CAP outbox.
3030

3131
```json
3232
{
3333
"cds": {
34-
"eventQueue": { "useAsCAPOutbox": true }
34+
"eventQueue": { "useAsCAPQueue": true }
3535
}
3636
}
3737
```

example-cap-server/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
"eventQueue": {
3333
"plugin": true,
3434
"runInterval": 300000,
35-
"useAsCAPOutbox": true,
35+
"useAsCAPQueue": true,
3636
"thresholdLoggingEventProcessing": 5000,
3737
"cleanupLocksAndEventsForDev": true,
3838
"events": {

src/EventQueueProcessorBase.js

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ const SELECT_LIMIT_EVENTS_PER_TICK = 100;
2424
const TRIES_FOR_EXCEEDED_EVENTS = 3;
2525
const EVENT_START_AFTER_HEADROOM = 3 * 1000;
2626
const SUFFIX_PERIODIC = "_PERIODIC";
27-
const DEFAULT_RETRY_AFTER = 5 * 60 * 1000;
2827

2928
const ALLOWED_FIELDS_FOR_UPDATE = ["status", "startAfter"];
3029

@@ -40,8 +39,6 @@ class EventQueueProcessorBase {
4039
#eventConfig;
4140
#isPeriodic;
4241
#lastSuccessfulRunTimestamp;
43-
#retryFailedAfter;
44-
#retryOpenAfter;
4542
#keepAliveRunner;
4643
#currentKeepAlivePromise = Promise.resolve();
4744
#etagMap;
@@ -69,8 +66,6 @@ class EventQueueProcessorBase {
6966
if (this.__parallelEventProcessing > LIMIT_PARALLEL_EVENT_PROCESSING) {
7067
this.__parallelEventProcessing = LIMIT_PARALLEL_EVENT_PROCESSING;
7168
}
72-
this.#retryFailedAfter = this.#eventConfig.retryFailedAfter ?? DEFAULT_RETRY_AFTER;
73-
this.#retryOpenAfter = this.#eventConfig.retryOpenAfter ?? DEFAULT_RETRY_AFTER;
7469
this.__concurrentEventProcessing = this.#eventConfig.multiInstanceProcessing;
7570
this.__retryAttempts = this.#isPeriodic ? 1 : this.#eventConfig.retryAttempts ?? DEFAULT_RETRY_ATTEMPTS;
7671
this.__selectMaxChunkSize = this.#eventConfig.selectMaxChunkSize ?? SELECT_LIMIT_EVENTS_PER_TICK;
@@ -456,10 +451,7 @@ class EventQueueProcessorBase {
456451
return result;
457452
}, {});
458453

459-
// TODO: schedule open events???
460454
for (const { ids, data } of Object.values(updateData)) {
461-
let startAfter;
462-
463455
if (!("status" in data)) {
464456
this.logger.error("can't find status value in return value of event-processing. Setting event to done", {
465457
ids,
@@ -474,14 +466,29 @@ class EventQueueProcessorBase {
474466
delete this.__notCommitedStatusMap[id];
475467
}
476468

477-
if ([EventProcessingStatus.Error, EventProcessingStatus.Open].includes(data.status)) {
478-
startAfter = new Date(Date.now() + this.#retryFailedAfter);
469+
if (![EventProcessingStatus.Open, EventProcessingStatus.Error].includes(data.status)) {
470+
delete data.startAfter;
471+
}
472+
473+
if (data.startAfter) {
474+
data.startAfter = this.#normalizeDate(data.startAfter);
475+
}
476+
477+
if (!data.startAfter && [EventProcessingStatus.Error, EventProcessingStatus.Open].includes(data.status)) {
478+
data.startAfter = new Date(
479+
Date.now() +
480+
(EventProcessingStatus.Error ? this.#eventConfig.retryFailedAfter : this.#eventConfig.retryOpenAfter)
481+
);
482+
}
483+
484+
// TODO: check that startAfter is only allowed for tbp events
485+
if (data.startAfter) {
479486
this.#eventSchedulerInstance.scheduleEvent(
480487
this.__context.tenant,
481488
this.#eventType,
482489
this.#eventSubType,
483490
this.#namespace,
484-
startAfter
491+
data.startAfter
485492
);
486493
}
487494

@@ -490,14 +497,28 @@ class EventQueueProcessorBase {
490497
.set({
491498
...data,
492499
lastAttemptTimestamp: ts,
493-
...(data.status === EventProcessingStatus.Error ? { startAfter: startAfter.toISOString() } : {}),
494500
})
495501
.where("ID IN", ids)
496502
);
497503
}
498504
});
499505
}
500506

507+
#normalizeDate(value) {
508+
if (value instanceof Date) {
509+
return value;
510+
}
511+
512+
if (typeof value === "string" || typeof value === "number") {
513+
const date = new Date(value);
514+
if (!isNaN(date)) {
515+
return date;
516+
}
517+
}
518+
519+
return null;
520+
}
521+
501522
#ensureEveryQueueEntryHasStatus() {
502523
this.__queueEntries.forEach((queueEntry) => {
503524
if (
@@ -529,10 +550,6 @@ class EventQueueProcessorBase {
529550
return;
530551
}
531552

532-
if (!status) {
533-
return;
534-
}
535-
536553
this.logger.error("Not allowed event status returned. Only Open, Done, Error is allowed!", {
537554
eventType: this.#eventType,
538555
eventSubType: this.#eventSubType,

src/config.js

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const CAP_EVENT_TYPE = "CAP_OUTBOX";
2424
const CAP_PARALLEL_DEFAULT = 5;
2525
const CAP_MAX_ATTEMPTS_DEFAULT = 5;
2626
const DELETE_TENANT_BLOCK_AFTER_MS = 5 * 60 * 1000;
27+
const DEFAULT_RETRY_AFTER = 5 * 60 * 1000;
2728
const PRIORITIES = Object.values(Priorities);
2829
const UTC_DEFAULT = false;
2930
const USE_CRON_TZ_DEFAULT = true;
@@ -252,7 +253,6 @@ class Config {
252253
selectMaxChunkSize: config.selectMaxChunkSize ?? config.chunkSize,
253254
parallelEventProcessing: config.parallelEventProcessing ?? (config.parallel && CAP_PARALLEL_DEFAULT),
254255
retryAttempts: config.retryAttempts ?? config.maxAttempts ?? CAP_MAX_ATTEMPTS_DEFAULT,
255-
propagateHeaders: config.propagateHeaders ?? [],
256256
namespace,
257257
...config,
258258
});
@@ -439,6 +439,8 @@ class Config {
439439
? Object.fromEntries(new Map(event.appInstances.map((a) => [a, true])))
440440
: null;
441441
event.propagateHeaders = event.propagateHeaders ?? [];
442+
event.retryFailedAfter = event.retryFailedAfter ?? DEFAULT_RETRY_AFTER;
443+
event.retryOpenAfter = event.retryOpenAfter ?? DEFAULT_RETRY_AFTER;
442444
}
443445

444446
#basicEventValidation(event) {
@@ -755,13 +757,24 @@ class Config {
755757
}
756758

757759
set useAsCAPOutbox(value) {
760+
if (this.#useAsCAPOutbox) {
761+
return;
762+
}
758763
this.#useAsCAPOutbox = value;
759764
}
760765

761766
get useAsCAPOutbox() {
762767
return this.#useAsCAPOutbox;
763768
}
764769

770+
set useAsCAPQueue(value) {
771+
this.useAsCAPOutbox = value;
772+
}
773+
774+
get useAsCAPQueue() {
775+
return this.useAsCAPOutbox;
776+
}
777+
765778
set userId(value) {
766779
this.#userId = value;
767780
}

src/initialize.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ const CONFIG_VARS = [
3535
["updatePeriodicEvents", true],
3636
["thresholdLoggingEventProcessing", 50],
3737
["useAsCAPOutbox", false],
38+
["useAsCAPQueue", false],
3839
["userId", null],
3940
["cleanupLocksAndEventsForDev", false],
4041
["redisOptions", {}],
@@ -65,6 +66,7 @@ const CONFIG_VARS = [
6566
* @param {boolean} [options.updatePeriodicEvents=true] - Automatically update periodic events.
6667
* @param {number} [options.thresholdLoggingEventProcessing=50] - Threshold for logging event processing time (in milliseconds).
6768
* @param {boolean} [options.useAsCAPOutbox=false] - Use the event queue as a CAP Outbox.
69+
* @param {boolean} [options.useAsCAPQueue=false] - Use the event queue as a CAP Outbox.
6870
* @param {string} [options.userId=null] - ID of the user initiating the process.
6971
* @param {boolean} [options.cleanupLocksAndEventsForDev=false] - Cleanup locks and events for development environments.
7072
* @param {Object} [options.redisOptions={}] - Configuration options for Redis.

src/outbox/EventQueueGenericOutboxHandler.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,40 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
367367
return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Done]);
368368
}
369369

370+
const [firstEntry] = result;
371+
if (Array.isArray(firstEntry)) {
372+
const [, innerResult] = firstEntry;
373+
if (innerResult instanceof Object) {
374+
return result;
375+
} else {
376+
return result.map(([id, status]) => {
377+
return [id, { status }];
378+
});
379+
}
380+
} else if (firstEntry instanceof Object) {
381+
return result.reduce((result, entry) => {
382+
let { ID } = entry;
383+
384+
if (!ID) {
385+
if (queueEntries.length > 1) {
386+
throw new Error(
387+
"The CAP handler return value does not match the event-queue specification. Please check the documentation"
388+
);
389+
} else {
390+
ID = queueEntries[0].ID;
391+
}
392+
}
393+
394+
delete entry.ID;
395+
if (!("status" in entry)) {
396+
entry.status = EventProcessingStatus.Done;
397+
}
398+
399+
result.push([ID, entry]);
400+
return result;
401+
}, []);
402+
}
403+
370404
const valid = !result.some((entry) => {
371405
const [, status] = entry;
372406
return !validStatusValues.includes(status);

src/outbox/eventQueueAsOutbox.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@ function outboxed(srv, customOpts) {
5656
outboxOpts = config.addCAPOutboxEventSpecificAction(srv.name, req.event);
5757
}
5858
const subType = hasSpecificSettings ? [srv.name, req.event].join(".") : srv.name;
59-
outboxOpts = config.getEventConfig(CDS_EVENT_TYPE, subType);
60-
const eventHeaders = getPropagatedHeaders(outboxOpts, req);
6159
const namespace = outboxOpts.namespace ?? config.namespace;
60+
outboxOpts = config.getEventConfig(CDS_EVENT_TYPE, subType, namespace);
61+
const eventHeaders = getPropagatedHeaders(outboxOpts, req);
6262
if (["persistent-outbox", "persistent-queue"].includes(outboxOpts.kind)) {
63-
await _mapToEventAndPublish(context, subType , req, eventHeaders, namespace);
63+
await _mapToEventAndPublish(context, subType, req, eventHeaders, namespace);
6464
return;
6565
}
6666
context.on("succeeded", async () => {

test/__snapshots__/eventQueueOutbox.test.js.snap

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true ad-hoc events overwrite se
4343
"priority": "medium",
4444
"propagateHeaders": [],
4545
"retryAttempts": 20,
46+
"retryFailedAfter": 300000,
47+
"retryOpenAfter": 300000,
4648
"selectMaxChunkSize": 10,
4749
"subType": "NotificationServicePeriodic.action",
4850
"transactionMode": "isolated",
@@ -69,6 +71,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true ad-hoc events overwrite se
6971
"priority": "medium",
7072
"propagateHeaders": [],
7173
"retryAttempts": 20,
74+
"retryFailedAfter": 300000,
75+
"retryOpenAfter": 300000,
7276
"selectMaxChunkSize": 10,
7377
"subType": "NotificationServicePeriodic.action",
7478
"transactionMode": "isolated",
@@ -95,6 +99,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true custom options should win
9599
"priority": "medium",
96100
"propagateHeaders": [],
97101
"retryAttempts": 20,
102+
"retryFailedAfter": 300000,
103+
"retryOpenAfter": 300000,
98104
"selectMaxChunkSize": 10,
99105
"subType": "NotificationServiceOutboxedByConfig",
100106
"transactionMode": "alwaysCommit",
@@ -133,6 +139,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true map config to event-queue
133139
"priority": "medium",
134140
"propagateHeaders": [],
135141
"retryAttempts": 20,
142+
"retryFailedAfter": 300000,
143+
"retryOpenAfter": 300000,
136144
"selectMaxChunkSize": 10,
137145
"subType": "NotificationService",
138146
"type": "CAP_OUTBOX",
@@ -169,6 +177,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true periodic events inherit co
169177
"namespace": "default",
170178
"priority": "medium",
171179
"propagateHeaders": [],
180+
"retryFailedAfter": 300000,
181+
"retryOpenAfter": 300000,
172182
"subType": "NotificationServicePeriodic.main",
173183
"transactionMode": "isolated",
174184
"type": "CAP_OUTBOX_PERIODIC",
@@ -204,6 +214,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true periodic events inherit co
204214
"namespace": "default",
205215
"priority": "medium",
206216
"propagateHeaders": [],
217+
"retryFailedAfter": 300000,
218+
"retryOpenAfter": 300000,
207219
"subType": "NotificationServicePeriodic.main",
208220
"transactionMode": "alwaysRollback",
209221
"type": "CAP_OUTBOX_PERIODIC",
@@ -230,6 +242,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true periodic events inherit co
230242
"namespace": "default",
231243
"priority": "medium",
232244
"propagateHeaders": [],
245+
"retryFailedAfter": 300000,
246+
"retryOpenAfter": 300000,
233247
"subType": "NotificationServicePeriodic.main",
234248
"transactionMode": "isolated",
235249
"type": "CAP_OUTBOX_PERIODIC",
@@ -420,6 +434,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true should work for outboxed s
420434
"priority": "medium",
421435
"propagateHeaders": [],
422436
"retryAttempts": 20,
437+
"retryFailedAfter": 300000,
438+
"retryOpenAfter": 300000,
423439
"selectMaxChunkSize": 10,
424440
"subType": "NotificationServiceOutboxedByConfig",
425441
"transactionMode": "alwaysRollback",

0 commit comments

Comments
 (0)