Skip to content

Commit a37a1cd

Browse files
committed
Merge branch 'fb/propagateHeaders' into fb/adjustPayloadReturn
# Conflicts: # CHANGELOG.md # package.json # src/config.js # src/outbox/eventQueueAsOutbox.js # test/asset/outboxProject/srv/service/standard-service.js
2 parents 1ac6c04 + 761c459 commit a37a1cd

File tree

9 files changed

+135
-14
lines changed

9 files changed

+135
-14
lines changed

CHANGELOG.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,20 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
66
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
77

8-
## v1.12.0 - 2025-XX-XX
8+
## v1.11.1 - 2025-11-13
99

1010
### Added
1111

12-
- fully support cds.queued
12+
- [Event Configuration] `propagateHeaders` allows forwarding headers from the original CDS context to the outbox call by specifying their names in an array.
1313

14-
## v1.11.0 - 2025-07-09
14+
## v1.11.0 - 2025-10-16
1515

1616
### Added
1717

1818
- Added `authInfo` to cds.User as CDS 9.3 deprecated `tokenInfo`.
1919
- Disable the automatic processing of suspended tenants. Can be turned off using `disableProcessingOfSuspendedTenants`.
2020

21-
## v1.10.10 - 2025-07-09
21+
## v1.10.10 - 2025-07-10
2222

2323
### Fixed
2424

docs/configure-event/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ they should be processed.
4848
| load | Indicates the load of the event, affecting the processing concurrency. | 1 |
4949
| retryAttempts | Number of retry attempts for failed events. Set to `-1` for infinite retries. | 3 |
5050
| processAfterCommit | Indicates whether an event is processed immediately after the transaction, in which the event was written, has been committed. | true |
51+
| propagateHeaders | Specifies which headers from the original CDS context should be forwarded to the outbox call. Provide an array of header names to propagate. | [] |
5152
| parallelEventProcessing | Number of events of the same type and subType that can be processed in parallel. The maximum limit is 10. | 1 |
5253
| transactionMode | Specifies the transaction mode for the event. For allowed values, refer to [Transaction Handling](/event-queue/transaction-handling/#transaction-modes). | isolated |
5354
| selectMaxChunkSize | Number of events selected in a single batch. Set `checkForNextChunk` to `true` if you want to check for more available events after processing a batch. | 100 |

src/config.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ const ALLOWED_EVENT_OPTIONS_AD_HOC = [
6060
"processAfterCommit",
6161
"checkForNextChunk",
6262
"retryFailedAfter",
63+
"propagateHeaders",
6364
"retryOpenAfter",
6465
"multiInstanceProcessing",
6566
"kind",
@@ -251,6 +252,7 @@ class Config {
251252
selectMaxChunkSize: config.selectMaxChunkSize ?? config.chunkSize,
252253
parallelEventProcessing: config.parallelEventProcessing ?? (config.parallel && CAP_PARALLEL_DEFAULT),
253254
retryAttempts: config.retryAttempts ?? config.maxAttempts ?? CAP_MAX_ATTEMPTS_DEFAULT,
255+
propagateHeaders: config.propagateHeaders ?? [],
254256
namespace,
255257
...config,
256258
});
@@ -436,6 +438,7 @@ class Config {
436438
event._appInstancesMap = event.appInstances
437439
? Object.fromEntries(new Map(event.appInstances.map((a) => [a, true])))
438440
: null;
441+
event.propagateHeaders = event.propagateHeaders ?? [];
439442
}
440443

441444
#basicEventValidation(event) {

src/outbox/eventQueueAsOutbox.js

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,16 @@ function outboxed(srv, customOpts) {
5151
customOpts || {}
5252
);
5353
config.addCAPOutboxEventBase(srv.name, outboxOpts);
54-
const specificSettings = config.getCdsOutboxEventSpecificConfig(srv.name, req.event);
55-
if (specificSettings) {
54+
const hasSpecificSettings = !!config.getCdsOutboxEventSpecificConfig(srv.name, req.event);
55+
if (hasSpecificSettings) {
5656
outboxOpts = config.addCAPOutboxEventSpecificAction(srv.name, req.event);
5757
}
58-
59-
const namespace = (specificSettings ?? outboxOpts).namespace ?? config.namespace;
58+
const subType = hasSpecificSettings ? [srv.name, req.event].join(".") : srv.name;
59+
outboxOpts = config.getEventConfig(CDS_EVENT_TYPE, subType);
60+
const eventHeaders = getPropagatedHeaders(outboxOpts, req);
61+
const namespace = outboxOpts.namespace ?? config.namespace;
6062
if (["persistent-outbox", "persistent-queue"].includes(outboxOpts.kind)) {
61-
await _mapToEventAndPublish(context, srv.name, req, !!specificSettings, namespace);
63+
await _mapToEventAndPublish(context, subType , req, eventHeaders, namespace);
6264
return;
6365
}
6466
context.on("succeeded", async () => {
@@ -84,32 +86,43 @@ function unboxed(srv) {
8486
return srv[UNBOXED] || srv;
8587
}
8688

87-
const _mapToEventAndPublish = async (context, name, req, actionSpecific, namespace) => {
89+
const getPropagatedHeaders = (config, req) => {
90+
const propagateHeaders = config.propagateHeaders.reduce((headers, headerName) => {
91+
if (headerName in req.tx.context.headers) {
92+
headers[headerName] = req.tx.context.headers[headerName];
93+
}
94+
return headers;
95+
}, {});
96+
return Object.assign(propagateHeaders, req.headers);
97+
};
98+
99+
const _mapToEventAndPublish = async (context, subType, req, eventHeaders, namespace) => {
88100
const eventQueueSpecificValues = {};
89101
for (const header in req.headers ?? {}) {
90102
for (const field of EVENT_QUEUE_SPECIFIC_FIELDS) {
91103
if (header.toLocaleLowerCase() === `x-eventqueue-${field.toLocaleLowerCase()}`) {
92104
eventQueueSpecificValues[field] = req.headers[header];
93-
delete req.headers[header];
105+
delete eventHeaders[header];
94106
break;
95107
}
96108
}
97109
}
110+
98111
const event = {
99112
contextUser: context.user.id,
100113
...(req._fromSend || (req.reply && { _fromSend: true })), // send or emit
101114
...(req.inbound && { inbound: req.inbound }),
102115
...(req.event && { event: req.event }),
103116
...(req.data && { data: req.data }),
104-
...(req.headers && { headers: req.headers }),
117+
...(eventHeaders && { headers: eventHeaders }),
105118
...(req.query && { query: req.query }),
106119
};
107120

108121
await publishEvent(
109122
cds.tx(context),
110123
{
111124
type: CDS_EVENT_TYPE,
112-
subType: actionSpecific ? [name, req.event].join(".") : name,
125+
subType,
113126
payload: JSON.stringify(event),
114127
namespace: eventQueueSpecificValues.namespace ?? namespace,
115128
...eventQueueSpecificValues,

test/__snapshots__/eventQueueOutbox.test.js.snap

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true ad-hoc events overwrite se
4141
"namespace": "default",
4242
"parallelEventProcessing": 5,
4343
"priority": "medium",
44+
"propagateHeaders": [],
4445
"retryAttempts": 20,
4546
"selectMaxChunkSize": 10,
4647
"subType": "NotificationServicePeriodic.action",
@@ -66,6 +67,7 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true ad-hoc events overwrite se
6667
"namespace": "default",
6768
"parallelEventProcessing": 5,
6869
"priority": "medium",
70+
"propagateHeaders": [],
6971
"retryAttempts": 20,
7072
"selectMaxChunkSize": 10,
7173
"subType": "NotificationServicePeriodic.action",
@@ -91,6 +93,7 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true custom options should win
9193
"namespace": "default",
9294
"parallelEventProcessing": 5,
9395
"priority": "medium",
96+
"propagateHeaders": [],
9497
"retryAttempts": 20,
9598
"selectMaxChunkSize": 10,
9699
"subType": "NotificationServiceOutboxedByConfig",
@@ -128,6 +131,7 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true map config to event-queue
128131
"namespace": "default",
129132
"parallelEventProcessing": 5,
130133
"priority": "medium",
134+
"propagateHeaders": [],
131135
"retryAttempts": 20,
132136
"selectMaxChunkSize": 10,
133137
"subType": "NotificationService",
@@ -142,6 +146,7 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true option to use eventQueue.u
142146
"subject": "subject",
143147
"to": "to",
144148
},
149+
"headers": {},
145150
"subType": "NotificationServiceOutboxedByConfigUserId",
146151
"user": "dummyTestUser",
147152
}
@@ -163,6 +168,7 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true periodic events inherit co
163168
"load": 60,
164169
"namespace": "default",
165170
"priority": "medium",
171+
"propagateHeaders": [],
166172
"subType": "NotificationServicePeriodic.main",
167173
"transactionMode": "isolated",
168174
"type": "CAP_OUTBOX_PERIODIC",
@@ -197,6 +203,7 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true periodic events inherit co
197203
"load": 60,
198204
"namespace": "default",
199205
"priority": "medium",
206+
"propagateHeaders": [],
200207
"subType": "NotificationServicePeriodic.main",
201208
"transactionMode": "alwaysRollback",
202209
"type": "CAP_OUTBOX_PERIODIC",
@@ -222,6 +229,7 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true periodic events inherit co
222229
"load": 60,
223230
"namespace": "default",
224231
"priority": "medium",
232+
"propagateHeaders": [],
225233
"subType": "NotificationServicePeriodic.main",
226234
"transactionMode": "isolated",
227235
"type": "CAP_OUTBOX_PERIODIC",
@@ -309,6 +317,7 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true req.data should be stored
309317
"subject": "subject",
310318
"to": "to",
311319
},
320+
"headers": {},
312321
"subType": "NotificationService",
313322
"user": "testUser",
314323
}
@@ -334,6 +343,7 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true req.data should be stored
334343
"subject": "subject",
335344
"to": "to",
336345
},
346+
"headers": {},
337347
"subType": "NotificationService",
338348
"user": "testUser",
339349
}
@@ -385,6 +395,7 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true should store correct user
385395
"subject": "subject",
386396
"to": "to",
387397
},
398+
"headers": {},
388399
"subType": "NotificationService",
389400
"user": "badman",
390401
}
@@ -407,6 +418,7 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true should work for outboxed s
407418
"namespace": "default",
408419
"parallelEventProcessing": 5,
409420
"priority": "medium",
421+
"propagateHeaders": [],
410422
"retryAttempts": 20,
411423
"selectMaxChunkSize": 10,
412424
"subType": "NotificationServiceOutboxedByConfig",

test/__snapshots__/initialize.test.js.snap

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ exports[`initialize read yaml config file 1`] = `
1616
"namespace": "default",
1717
"parallelEventProcessing": 5,
1818
"priority": "medium",
19+
"propagateHeaders": [],
1920
"subType": "Task",
2021
"type": "Notifications",
2122
},
@@ -33,6 +34,7 @@ exports[`initialize read yaml config file 1`] = `
3334
"namespace": "default",
3435
"parallelEventProcessing": 1,
3536
"priority": "medium",
37+
"propagateHeaders": [],
3638
"subType": "CommunicationSystem",
3739
"type": "BusinessLogs",
3840
},

test/asset/outboxProject/srv/service/service.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ class NotificationService extends cds.Service {
1010
data: req.data,
1111
user: req.user.id,
1212
subType: req.eventQueue?.processor.eventSubType,
13+
headers: req.headers,
1314
});
1415
});
1516

test/asset/outboxProject/srv/service/standard-service.js

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class StandardService extends cds.Service {
99
cds.log(this.name).info(req.event, {
1010
data: req.data,
1111
user: req.user.id,
12+
headers: req.headers,
1213
});
1314
});
1415

@@ -19,6 +20,37 @@ class StandardService extends cds.Service {
1920
});
2021
});
2122

23+
this.on("callNextOutbox", async (req) => {
24+
const outboxedService = cds.outboxed(this).tx(req);
25+
await outboxedService.send(
26+
"main",
27+
{
28+
to: "to",
29+
subject: "subject",
30+
body: "body",
31+
},
32+
{
33+
"x-eventqueue-startAfter": new Date(),
34+
}
35+
);
36+
});
37+
38+
this.on("callNextOutboxMix", async (req) => {
39+
const outboxedService = cds.outboxed(this).tx(req);
40+
await outboxedService.send(
41+
"main",
42+
{
43+
to: "to",
44+
subject: "subject",
45+
body: "body",
46+
},
47+
{
48+
customHeader: 456,
49+
myNextHeader: 123,
50+
}
51+
);
52+
});
53+
2254
this.on("plainStatus", (req) => {
2355
cds.log(this.name).info(req.event, {
2456
data: req.data,

test/eventQueueOutbox.test.js

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ cds.env.requires.StandardService = {
9898
impl: path.join(basePath, "srv/service/standard-service.js"),
9999
outbox: {
100100
kind: "persistent-outbox",
101+
propagateHeaders: ["authId", "customHeader"],
101102
events: {
102103
timeBucketAction: {
103104
timeBucket: "*/60 * * * * *",
@@ -672,6 +673,62 @@ describe("event-queue outbox", () => {
672673
expect(loggerMock.callsLengths().error).toEqual(0);
673674
});
674675

676+
it("should correctly propagate headers: send/emit event from CAP service with existing headers", async () => {
677+
const service = await cds.connect.to("StandardService");
678+
const outboxedService = cds.outboxed(service).tx(context);
679+
const headers = { customHeader: 123, authId: 123 };
680+
await outboxedService.send(
681+
"callNextOutbox",
682+
{
683+
to: "to",
684+
subject: "subject",
685+
body: "body",
686+
},
687+
{ ...headers, xyz: 123 }
688+
);
689+
await commitAndOpenNew();
690+
await testHelper.selectEventQueueAndExpectOpen(tx, { expectedLength: 1 });
691+
expect(loggerMock).not.sendFioriActionCalled();
692+
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
693+
await commitAndOpenNew();
694+
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
695+
await testHelper.selectEventQueueAndExpectDone(tx, { expectedLength: 2 });
696+
expect(loggerMock).actionCalled("main", {
697+
headers,
698+
});
699+
expect(loggerMock.callsLengths().error).toEqual(0);
700+
});
701+
702+
it("should correctly propagate headers: should mix headers from both calls", async () => {
703+
const service = await cds.connect.to("StandardService");
704+
const outboxedService = cds.outboxed(service).tx(context);
705+
const headers = { customHeader: 123, authId: 123 };
706+
await outboxedService.send(
707+
"callNextOutboxMix",
708+
{
709+
to: "to",
710+
subject: "subject",
711+
body: "body",
712+
},
713+
{ ...headers, xyz: 123 }
714+
);
715+
await commitAndOpenNew();
716+
await testHelper.selectEventQueueAndExpectOpen(tx, { expectedLength: 1 });
717+
expect(loggerMock).not.sendFioriActionCalled();
718+
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
719+
await commitAndOpenNew();
720+
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
721+
await testHelper.selectEventQueueAndExpectDone(tx, { expectedLength: 2 });
722+
expect(loggerMock).actionCalled("main", {
723+
headers: {
724+
customHeader: 456,
725+
authId: 123,
726+
myNextHeader: 123,
727+
},
728+
});
729+
expect(loggerMock.callsLengths().error).toEqual(0);
730+
});
731+
675732
describe("not connected service should lazily connect and create configuration", () => {
676733
beforeEach(() => {
677734
eventQueue.config.removeEvent("CAP_OUTBOX", "NotificationService");
@@ -1996,7 +2053,7 @@ expect.extend({
19962053
};
19972054
},
19982055
actionCalled: (loggerMock, actionName, properties = {}) => {
1999-
const call = loggerMock.calls().info.find((c) => c[0] === actionName);
2056+
const call = loggerMock.calls().info.find((c) => actionName === c[0]);
20002057
if (!call) {
20012058
return {
20022059
message: () => `action not called! name: ${actionName}`,

0 commit comments

Comments
 (0)