Skip to content

Commit 169c053

Browse files
committed
wip
1 parent a2dccfd commit 169c053

File tree

8 files changed

+126
-12
lines changed

8 files changed

+126
-12
lines changed

CHANGELOG.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,21 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
66
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
77

8-
## v1.11.0 - 2025-07-09
8+
## v1.11.1 - 2025-11-13
9+
10+
### Added
11+
12+
- [Event Configuration] `propagateHeaders` allows forwarding headers from the original CDS context to the outbox call by specifying their names in an array.
13+
14+
15+
## v1.11.0 - 2025-10-16
916

1017
### Added
1118

1219
- Added `authInfo` to cds.User as CDS 9.3 deprecated `tokenInfo`.
1320
- Disable the automatic processing of suspended tenants. Can be turned off using `disableProcessingOfSuspendedTenants`.
1421

15-
## v1.10.10 - 2025-07-09
22+
## v1.10.10 - 2025-07-10
1623

1724
### Fixed
1825

docs/configure-event/index.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,14 @@ they should be processed.
4141
## Parameters
4242

4343
| Property | Description | Default Value |
44-
| ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------- |
44+
|-------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------|
4545
| impl | Path of the implementation class associated with the event. | - |
4646
| type | Specifies the type of the event. | - |
4747
| subType | Specifies the subtype of the event, further categorizing the event type. | - |
4848
| load | Indicates the load of the event, affecting the processing concurrency. | 1 |
4949
| retryAttempts | Number of retry attempts for failed events. Set to `-1` for infinite retries. | 3 |
5050
| processAfterCommit | Indicates whether an event is processed immediately after the transaction, in which the event was written, has been committed. | true |
51+
| propagateHeaders | Specifies which headers from the original CDS context should be forwarded to the outbox call. Provide an array of header names to propagate. | [] |
5152
| parallelEventProcessing | Number of events of the same type and subType that can be processed in parallel. The maximum limit is 10. | 1 |
5253
| transactionMode | Specifies the transaction mode for the event. For allowed values, refer to [Transaction Handling](/event-queue/transaction-handling/#transaction-modes). | isolated |
5354
| selectMaxChunkSize | Number of events selected in a single batch. Set `checkForNextChunk` to `true` if you want to check for more available events after processing a batch. | 100 |

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@cap-js-community/event-queue",
3-
"version": "1.11.0",
3+
"version": "1.11.1",
44
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",
55
"main": "src/index.js",
66
"types": "src/index.d.ts",

src/config.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ const ALLOWED_EVENT_OPTIONS_AD_HOC = [
6565
"processAfterCommit",
6666
"checkForNextChunk",
6767
"retryFailedAfter",
68+
"propagateHeaders",
6869
"multiInstanceProcessing",
6970
"kind",
7071
"timeBucket",
@@ -388,6 +389,7 @@ class Config {
388389
selectMaxChunkSize: config.selectMaxChunkSize ?? config.chunkSize,
389390
parallelEventProcessing: config.parallelEventProcessing ?? (config.parallel && CAP_PARALLEL_DEFAULT),
390391
retryAttempts: config.retryAttempts ?? config.maxAttempts ?? CAP_MAX_ATTEMPTS_DEFAULT,
392+
propagateHeaders: config.propagateHeaders ?? [],
391393
...config,
392394
});
393395
eventConfig.internalEvent = true;
@@ -583,6 +585,7 @@ class Config {
583585
event._appInstancesMap = event.appInstances
584586
? Object.fromEntries(new Map(event.appInstances.map((a) => [a, true])))
585587
: null;
588+
event.propagateHeaders = event.propagateHeaders ?? [];
586589
}
587590

588591
#basicEventValidation(event) {

src/outbox/eventQueueAsOutbox.js

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,15 @@ function outboxed(srv, customOpts) {
4747
customOpts || {}
4848
);
4949
config.addCAPOutboxEventBase(srv.name, outboxOpts);
50-
const specificSettings = config.getCdsOutboxEventSpecificConfig(srv.name, req.event);
51-
if (specificSettings) {
50+
const hasSpecificSettings = !!config.getCdsOutboxEventSpecificConfig(srv.name, req.event);
51+
if (hasSpecificSettings) {
5252
outboxOpts = config.addCAPOutboxEventSpecificAction(srv.name, req.event);
5353
}
54-
54+
const subType = hasSpecificSettings ? [srv.name, req.event].join(".") : srv.name;
55+
outboxOpts = config.getEventConfig(CDS_EVENT_TYPE, subType);
56+
const eventHeaders = getPropagatedHeaders(outboxOpts, req);
5557
if (["persistent-outbox", "persistent-queue"].includes(outboxOpts.kind)) {
56-
await _mapToEventAndPublish(context, srv.name, req, !!specificSettings);
58+
await _mapToEventAndPublish(context, subType, req, eventHeaders);
5759
return;
5860
}
5961
context.on("succeeded", async () => {
@@ -79,7 +81,17 @@ function unboxed(srv) {
7981
return srv[UNBOXED] || srv;
8082
}
8183

82-
const _mapToEventAndPublish = async (context, name, req, actionSpecific) => {
84+
const getPropagatedHeaders = (config, req) => {
85+
const propagateHeaders = config.propagateHeaders.reduce((headers, headerName) => {
86+
if (headerName in req.tx.context.headers) {
87+
headers[headerName] = req.tx.context.headers[headerName];
88+
}
89+
return headers;
90+
}, {});
91+
return Object.assign(propagateHeaders, req.headers);
92+
};
93+
94+
const _mapToEventAndPublish = async (context, subType, req, eventHeaders) => {
8395
const eventQueueSpecificValues = {};
8496
for (const header in req.headers ?? {}) {
8597
for (const field of EVENT_QUEUE_SPECIFIC_FIELDS) {
@@ -90,19 +102,20 @@ const _mapToEventAndPublish = async (context, name, req, actionSpecific) => {
90102
}
91103
}
92104
}
105+
93106
const event = {
94107
contextUser: context.user.id,
95108
...(req._fromSend || (req.reply && { _fromSend: true })), // send or emit
96109
...(req.inbound && { inbound: req.inbound }),
97110
...(req.event && { event: req.event }),
98111
...(req.data && { data: req.data }),
99-
...(req.headers && { headers: req.headers }),
112+
...(eventHeaders && { headers: eventHeaders }),
100113
...(req.query && { query: req.query }),
101114
};
102115

103116
await publishEvent(cds.tx(context), {
104117
type: CDS_EVENT_TYPE,
105-
subType: actionSpecific ? [name, req.event].join(".") : name,
118+
subType,
106119
payload: JSON.stringify(event),
107120
...eventQueueSpecificValues,
108121
});

test/asset/outboxProject/srv/service/service.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ class NotificationService extends cds.Service {
1010
data: req.data,
1111
user: req.user.id,
1212
subType: req.eventQueue?.processor.eventSubType,
13+
headers: req.headers,
1314
});
1415
});
1516

test/asset/outboxProject/srv/service/standard-service.js

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class StandardService extends cds.Service {
99
cds.log(this.name).info(req.event, {
1010
data: req.data,
1111
user: req.user.id,
12+
headers: req.headers,
1213
});
1314
});
1415

@@ -18,6 +19,37 @@ class StandardService extends cds.Service {
1819
user: req.user.id,
1920
});
2021
});
22+
23+
this.on("callNextOutbox", async (req) => {
24+
const outboxedService = cds.outboxed(this).tx(req);
25+
await outboxedService.send(
26+
"main",
27+
{
28+
to: "to",
29+
subject: "subject",
30+
body: "body",
31+
},
32+
{
33+
"x-eventqueue-startAfter": new Date(),
34+
}
35+
);
36+
});
37+
38+
this.on("callNextOutboxMix", async (req) => {
39+
const outboxedService = cds.outboxed(this).tx(req);
40+
await outboxedService.send(
41+
"main",
42+
{
43+
to: "to",
44+
subject: "subject",
45+
body: "body",
46+
},
47+
{
48+
customHeader: 456,
49+
myNextHeader: 123,
50+
}
51+
);
52+
});
2153
}
2254
}
2355

test/eventQueueOutbox.test.js

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ cds.env.requires.StandardService = {
9898
impl: path.join(basePath, "srv/service/standard-service.js"),
9999
outbox: {
100100
kind: "persistent-outbox",
101+
propagateHeaders: ["authId", "customHeader"],
101102
events: {
102103
timeBucketAction: {
103104
timeBucket: "*/60 * * * * *",
@@ -644,6 +645,62 @@ describe("event-queue outbox", () => {
644645
expect(loggerMock.callsLengths().error).toEqual(0);
645646
});
646647

648+
it("should correctly propagate headers: send/emit event from CAP service with existing headers", async () => {
649+
const service = await cds.connect.to("StandardService");
650+
const outboxedService = cds.outboxed(service).tx(context);
651+
const headers = { customHeader: 123, authId: 123 };
652+
await outboxedService.send(
653+
"callNextOutbox",
654+
{
655+
to: "to",
656+
subject: "subject",
657+
body: "body",
658+
},
659+
{ ...headers, xyz: 123 }
660+
);
661+
await commitAndOpenNew();
662+
await testHelper.selectEventQueueAndExpectOpen(tx, { expectedLength: 1 });
663+
expect(loggerMock).not.sendFioriActionCalled();
664+
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
665+
await commitAndOpenNew();
666+
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
667+
await testHelper.selectEventQueueAndExpectDone(tx, { expectedLength: 2 });
668+
expect(loggerMock).actionCalled("main", {
669+
headers,
670+
});
671+
expect(loggerMock.callsLengths().error).toEqual(0);
672+
});
673+
674+
it("should correctly propagate headers: should mix headers from both calls", async () => {
675+
const service = await cds.connect.to("StandardService");
676+
const outboxedService = cds.outboxed(service).tx(context);
677+
const headers = { customHeader: 123, authId: 123 };
678+
await outboxedService.send(
679+
"callNextOutboxMix",
680+
{
681+
to: "to",
682+
subject: "subject",
683+
body: "body",
684+
},
685+
{ ...headers, xyz: 123 }
686+
);
687+
await commitAndOpenNew();
688+
await testHelper.selectEventQueueAndExpectOpen(tx, { expectedLength: 1 });
689+
expect(loggerMock).not.sendFioriActionCalled();
690+
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
691+
await commitAndOpenNew();
692+
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
693+
await testHelper.selectEventQueueAndExpectDone(tx, { expectedLength: 2 });
694+
expect(loggerMock).actionCalled("main", {
695+
headers: {
696+
customHeader: 456,
697+
authId: 123,
698+
myNextHeader: 123,
699+
},
700+
});
701+
expect(loggerMock.callsLengths().error).toEqual(0);
702+
});
703+
647704
describe("not connected service should lazily connect and create configuration", () => {
648705
beforeEach(() => {
649706
eventQueue.config.removeEvent("CAP_OUTBOX", "NotificationService");
@@ -1766,7 +1823,7 @@ expect.extend({
17661823
};
17671824
},
17681825
actionCalled: (loggerMock, actionName, properties = {}) => {
1769-
const call = loggerMock.calls().info.find((c) => c[0] === actionName);
1826+
const call = loggerMock.calls().info.find((c) => c[0]?.includes(actionName));
17701827
if (!call) {
17711828
return {
17721829
message: () => `action not called! name: ${actionName}`,

0 commit comments

Comments
 (0)