Skip to content

Commit 6c1613f

Browse files
1kilobitfeywind
andauthored
fix: check batch size before re-queueing retried messages (#2027)
* fix: check batch size before re-queueing retried messages * fix: remove dangling Promise after 5.x linting changes * fix: bring up to date with the logging changes --------- Co-authored-by: Megan Potter <[email protected]>
1 parent 47517b2 commit 6c1613f

File tree

1 file changed

+19
-0
lines changed

1 file changed

+19
-0
lines changed

src/message-queues.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,11 +287,30 @@ export abstract class MessageQueue {
287287
return;
288288
}
289289

290+
// If we'd go over maxMessages or MAX_BATCH_BYTES by re-queueing this
291+
// message, flush first
292+
const {maxMessages} = this._options;
293+
const size = Buffer.byteLength(message.message.ackId, 'utf8');
294+
if (
295+
this._requests.length + 1 >= maxMessages! ||
296+
this.bytes + size >= MAX_BATCH_BYTES
297+
) {
298+
const reason =
299+
this._requests.length + 1 >= maxMessages!
300+
? 'going over count'
301+
: 'going over size';
302+
303+
// No need to wait on this one; it clears the old batch out, and acks
304+
// are best effort.
305+
this.flush(reason).catch(() => {});
306+
}
307+
290308
// Just throw it in for another round of processing on the next batch.
291309
this._requests.push(message);
292310
this.numPendingRequests++;
293311
this.numInFlightRequests++;
294312
this.numInRetryRequests--;
313+
this.bytes += size;
295314

296315
// Make sure we actually do have another batch scheduled.
297316
if (!this._timer) {

0 commit comments

Comments
 (0)