Skip to content

Commit c6c5bd0

Browse files
committed
feat(batch): use async local storage for batch
1 parent 4507fcc commit c6c5bd0

14 files changed

+1350
-69
lines changed

package-lock.json

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/batch/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
],
8484
"dependencies": {
8585
"@aws-lambda-powertools/commons": "2.28.1",
86+
"@aws/lambda-invoke-store": "0.1.1",
8687
"@standard-schema/spec": "^1.0.0"
8788
},
8889
"devDependencies": {

packages/batch/src/BasePartialBatchProcessor.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,6 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
3535
*/
3636
public COLLECTOR_MAPPING;
3737

38-
/**
39-
* Response to be returned after processing
40-
*/
41-
public batchResponse: PartialItemFailureResponse;
42-
4338
/**
4439
* Type of event that the processor is handling
4540
*/
@@ -200,9 +195,9 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
200195
* Set up the processor with the initial state ready for processing
201196
*/
202197
public prepare(): void {
203-
this.successMessages.length = 0;
204-
this.failureMessages.length = 0;
205-
this.errors.length = 0;
198+
this.successMessages = [];
199+
this.failureMessages = [];
200+
this.errors = [];
206201
this.batchResponse = DEFAULT_RESPONSE;
207202
}
208203

packages/batch/src/BasePartialProcessor.ts

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { BatchProcessingStore } from './BatchProcessingStore.js';
12
import type {
23
BaseRecord,
34
BatchProcessingOptions,
@@ -21,42 +22,64 @@ import type {
2122
*/
2223
abstract class BasePartialProcessor {
2324
/**
24-
* List of errors that occurred during processing
25+
* Store for managing invocation-specific state
2526
*/
26-
public errors: Error[];
27+
readonly #store = new BatchProcessingStore();
2728

28-
/**
29-
* List of records that failed processing
30-
*/
31-
public failureMessages: EventSourceDataClassTypes[];
29+
public get errors(): Error[] {
30+
return this.#store.getErrors();
31+
}
3232

33-
/**
34-
* Record handler provided by customers to process records
35-
*/
36-
public handler: CallableFunction;
33+
protected set errors(errors: Error[]) {
34+
this.#store.setErrors(errors);
35+
}
3736

38-
/**
39-
* Options to be used during processing (optional)
40-
*/
41-
public options?: BatchProcessingOptions;
37+
public get failureMessages(): EventSourceDataClassTypes[] {
38+
return this.#store.getFailureMessages();
39+
}
4240

43-
/**
44-
* List of records to be processed
45-
*/
46-
public records: BaseRecord[];
41+
protected set failureMessages(messages: EventSourceDataClassTypes[]) {
42+
this.#store.setFailureMessages(messages);
43+
}
4744

48-
/**
49-
* List of records that were processed successfully
50-
*/
51-
public successMessages: EventSourceDataClassTypes[];
52-
53-
public constructor() {
54-
this.successMessages = [];
55-
this.failureMessages = [];
56-
this.errors = [];
57-
this.records = [];
58-
// No-op function to avoid null checks, will be overridden by customer when using the class
59-
this.handler = new Function();
45+
public get handler(): CallableFunction {
46+
return this.#store.getHandler();
47+
}
48+
49+
protected set handler(handler: CallableFunction) {
50+
this.#store.setHandler(handler);
51+
}
52+
53+
public get options(): BatchProcessingOptions | undefined {
54+
return this.#store.getOptions();
55+
}
56+
57+
protected set options(options: BatchProcessingOptions | undefined) {
58+
this.#store.setOptions(options);
59+
}
60+
61+
public get records(): BaseRecord[] {
62+
return this.#store.getRecords();
63+
}
64+
65+
protected set records(records: BaseRecord[]) {
66+
this.#store.setRecords(records);
67+
}
68+
69+
public get successMessages(): EventSourceDataClassTypes[] {
70+
return this.#store.getSuccessMessages();
71+
}
72+
73+
protected set successMessages(messages: EventSourceDataClassTypes[]) {
74+
this.#store.setSuccessMessages(messages);
75+
}
76+
77+
protected get batchResponse() {
78+
return this.#store.getBatchResponse();
79+
}
80+
81+
protected set batchResponse(response) {
82+
this.#store.setBatchResponse(response);
6083
}
6184

6285
/**
@@ -196,7 +219,7 @@ abstract class BasePartialProcessor {
196219
*
197220
* We use a separate method to do this rather than the constructor
198221
* to allow for reusing the processor instance across multiple invocations
199-
* by instantiating the processor outside of the Lambda function handler.
222+
* by instantiating the processor outside the Lambda function handler.
200223
*
201224
* @param records - Array of records to be processed
202225
* @param handler - CallableFunction to process each record from the batch
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
import { InvokeStore } from '@aws/lambda-invoke-store';
2+
import type {
3+
BaseRecord,
4+
BatchProcessingOptions,
5+
EventSourceDataClassTypes,
6+
PartialItemFailureResponse,
7+
} from './types.js';
8+
9+
/**
10+
* Manages storage of batch processing state with automatic context detection.
11+
*
12+
* This class abstracts the storage mechanism for batch processing state,
13+
* automatically choosing between InvokeStore (when in Lambda context) and
14+
* fallback instance variables (when outside Lambda context). The decision is
15+
* made at runtime on every method call to support Lambda's concurrent execution
16+
* isolation.
17+
*/
18+
class BatchProcessingStore {
19+
readonly #recordsKey = Symbol('powertools.batch.records');
20+
readonly #handlerKey = Symbol('powertools.batch.handler');
21+
readonly #optionsKey = Symbol('powertools.batch.options');
22+
readonly #failureMessagesKey = Symbol('powertools.batch.failureMessages');
23+
readonly #successMessagesKey = Symbol('powertools.batch.successMessages');
24+
readonly #batchResponseKey = Symbol('powertools.batch.batchResponse');
25+
readonly #errorsKey = Symbol('powertools.batch.errors');
26+
27+
#fallbackRecords: BaseRecord[] = [];
28+
#fallbackHandler: CallableFunction = () => {};
29+
#fallbackOptions?: BatchProcessingOptions;
30+
#fallbackFailureMessages: EventSourceDataClassTypes[] = [];
31+
#fallbackSuccessMessages: EventSourceDataClassTypes[] = [];
32+
#fallbackBatchResponse: PartialItemFailureResponse = {
33+
batchItemFailures: [],
34+
};
35+
#fallbackErrors: Error[] = [];
36+
37+
public getRecords(): BaseRecord[] {
38+
if (InvokeStore.getContext() === undefined) {
39+
return this.#fallbackRecords;
40+
}
41+
return (InvokeStore.get(this.#recordsKey) as BaseRecord[]) ?? [];
42+
}
43+
44+
public setRecords(records: BaseRecord[]): void {
45+
if (InvokeStore.getContext() === undefined) {
46+
this.#fallbackRecords = records;
47+
return;
48+
}
49+
InvokeStore.set(this.#recordsKey, records);
50+
}
51+
52+
public getHandler(): CallableFunction {
53+
if (InvokeStore.getContext() === undefined) {
54+
return this.#fallbackHandler;
55+
}
56+
return (
57+
(InvokeStore.get(this.#handlerKey) as CallableFunction) ?? (() => {})
58+
);
59+
}
60+
61+
public setHandler(handler: CallableFunction): void {
62+
if (InvokeStore.getContext() === undefined) {
63+
this.#fallbackHandler = handler;
64+
return;
65+
}
66+
InvokeStore.set(this.#handlerKey, handler);
67+
}
68+
69+
public getOptions(): BatchProcessingOptions | undefined {
70+
if (InvokeStore.getContext() === undefined) {
71+
return this.#fallbackOptions;
72+
}
73+
return InvokeStore.get(this.#optionsKey) as
74+
| BatchProcessingOptions
75+
| undefined;
76+
}
77+
78+
public setOptions(options: BatchProcessingOptions | undefined): void {
79+
if (InvokeStore.getContext() === undefined) {
80+
this.#fallbackOptions = options;
81+
return;
82+
}
83+
InvokeStore.set(this.#optionsKey, options);
84+
}
85+
86+
public getFailureMessages(): EventSourceDataClassTypes[] {
87+
if (InvokeStore.getContext() === undefined) {
88+
return this.#fallbackFailureMessages;
89+
}
90+
return (
91+
(InvokeStore.get(
92+
this.#failureMessagesKey
93+
) as EventSourceDataClassTypes[]) ?? []
94+
);
95+
}
96+
97+
public setFailureMessages(messages: EventSourceDataClassTypes[]): void {
98+
if (InvokeStore.getContext() === undefined) {
99+
this.#fallbackFailureMessages = messages;
100+
return;
101+
}
102+
InvokeStore.set(this.#failureMessagesKey, messages);
103+
}
104+
105+
public getSuccessMessages(): EventSourceDataClassTypes[] {
106+
if (InvokeStore.getContext() === undefined) {
107+
return this.#fallbackSuccessMessages;
108+
}
109+
return (
110+
(InvokeStore.get(
111+
this.#successMessagesKey
112+
) as EventSourceDataClassTypes[]) ?? []
113+
);
114+
}
115+
116+
public setSuccessMessages(messages: EventSourceDataClassTypes[]): void {
117+
if (InvokeStore.getContext() === undefined) {
118+
this.#fallbackSuccessMessages = messages;
119+
return;
120+
}
121+
InvokeStore.set(this.#successMessagesKey, messages);
122+
}
123+
124+
public getBatchResponse(): PartialItemFailureResponse {
125+
if (InvokeStore.getContext() === undefined) {
126+
return this.#fallbackBatchResponse;
127+
}
128+
return (
129+
(InvokeStore.get(
130+
this.#batchResponseKey
131+
) as PartialItemFailureResponse) ?? { batchItemFailures: [] }
132+
);
133+
}
134+
135+
public setBatchResponse(response: PartialItemFailureResponse): void {
136+
if (InvokeStore.getContext() === undefined) {
137+
this.#fallbackBatchResponse = response;
138+
return;
139+
}
140+
InvokeStore.set(this.#batchResponseKey, response);
141+
}
142+
143+
public getErrors(): Error[] {
144+
if (InvokeStore.getContext() === undefined) {
145+
return this.#fallbackErrors;
146+
}
147+
return (InvokeStore.get(this.#errorsKey) as Error[]) ?? [];
148+
}
149+
150+
public setErrors(errors: Error[]): void {
151+
if (InvokeStore.getContext() === undefined) {
152+
this.#fallbackErrors = errors;
153+
return;
154+
}
155+
InvokeStore.set(this.#errorsKey, errors);
156+
}
157+
}
158+
159+
export { BatchProcessingStore };

packages/batch/src/SqsFifoPartialProcessor.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,11 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
142142
const remainingRecords = this.records.slice(firstFailureIndex);
143143

144144
for (const record of remainingRecords) {
145-
this.#processFailRecord(record, new SqsFifoShortCircuitError());
145+
const result = this.#processFailRecord(
146+
record,
147+
new SqsFifoShortCircuitError()
148+
);
149+
processedRecords.push(result);
146150
}
147151

148152
this.clean();

packages/batch/src/SqsFifoPartialProcessorAsync.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,11 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor {
141141
const remainingRecords = this.records.slice(firstFailureIndex);
142142

143143
for (const record of remainingRecords) {
144-
this.#processFailRecord(record, new SqsFifoShortCircuitError());
144+
const result = this.#processFailRecord(
145+
record,
146+
new SqsFifoShortCircuitError()
147+
);
148+
processedRecords.push(result);
145149
}
146150

147151
this.clean();

0 commit comments

Comments
 (0)