-
Notifications
You must be signed in to change notification settings - Fork 1
Fix: DLQ redrive correctly preserves _id and updates status #57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Fix: DLQ redrive correctly preserves _id and updates status #57
Conversation
darrenpicard25
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello Kamapannu
Thank you so much for contributing to the chrono project. The team and I will look at your PR soon. In the mean time if you could update a few things it would be greatly appreciated and help speed up the process
- linting/formating seems to be slightly off. Could you ensure your have biome set up properly and are running appropriate lint/format commands
- Could you ensure your PR description includes the reason why you would like this feature. Is it to address issue #35 ??
- you appear to have corrected some spelling mistakes which is great, but some updates introduce spelling mistakes. Could you spend a minute to look them over and make sure all spelling suggestions are correct
If you can address these issues before the actual code review it will greatly speed up the process. If not we will probably add comments for changes in the review when we get to it.
Thank you
darrenpicard25
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello Kamalpannu
While we appreciate you spending the time to put up this PR to improve Chrono. I do not believe we can proceed with this PR.
The entire approach taken in this PR does not really make any sense as basically this is equivalent to just having an infinite retry policy (which is maybe something we can do), but would not even work due to duplicate key constraints.
It is clear that the code was not tested, as it would fail, while also blocking the processing of any other task
Let us know if you would like to discuss further, but we cannot proceed with this PR as it is now and a new approach needs to be thought of
| /** The maximum time a task handler can take to complete before it will be considered timed out @default 5000ms */ | ||
| taskHandlerTimeoutMs?: number; | ||
| /** The maximum number of retries for a task handler, before task is marked as failed. @default 5 */ | ||
| /** The maximum number of retries for a tasak handler, before task is marked as failed. @default 5 */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /** The maximum number of retries for a tasak handler, before task is marked as failed. @default 5 */ | |
| /** The maximum number of retries for a task handler, before task is marked as failed. @default 5 */ |
| ); | ||
|
|
||
| this.stopRequested = true; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should produce a linting warning having removed this empty line
| timestamp: new Date(), | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slightly confused by this
- you are blocking the processing-loop for 60 seconds. meaning no new tasks can be claimed while this flow is being hit
- redriveFromDlq grabs all "tasks" in the DLQ yet you only emit a single TASK_RETRY_SCHEDULED event despite the possibility of many more being "re-scheduled"
- this logic adds a task document to the DLQ collection. Queries it. Inserts identical document into main collection. but does not update the "old" document in any way. And because you have now by-passed the
await this.datastore.failcall the original document will be re-queried after the timeout interval.
| claimedAt: undefined, | ||
| lastExecutedAt: new Date(), | ||
| _id: task._id, // make sure original _id is kept | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i kinda mentioned it in other comment but you are just re-inserting another document into the main collection. you're even still propagating the retryCount meaning this process will kick off again on a single error.
| throw new Error('DLQ collection name is not set'); | ||
| } | ||
| return database.collection<TaskDocument<TaskKind, TaskMapping[TaskKind]>>(this.config.dlqCollectionName); | ||
| }*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
???
was this meant to be deleted?
| */ | ||
| async redriveFromDlq<TaskKind extends keyof TaskMapping>(): Promise<void> { | ||
| const database = await this.getDatabase(); | ||
| const dlqName = this.config.dlqCollectionName ?? 'chrono-tasks-dlq'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defaults should be decide in the constructor. aka we can make dlqCollectionName required
package.json
Outdated
| }, | ||
| "dependencies": { | ||
| "@neofinancial/chrono": "^0.5.1", | ||
| "mongodb-memory-server": "^10.2.3" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
???? not sure why these were added?
| status: TaskStatus.PENDING, | ||
| claimedAt: undefined, | ||
| lastExecutedAt: new Date(), | ||
| _id: task._id, // make sure original _id is kept |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will error. you cannot have 2 documents in main collection with same _id
|
Thank you very much for pointing this out for me it helps a lot as these
points give me idea of trying different approach for keeping the max dlq
tires upto 3 you can check it out in my latest commit whenever you have
some time i try to cover everything which you pointed out if still you
didn't see upto mark i am ready to again work on this problem. Sorry for
trouble
Thanks,
Preetkamal Singh
…On Thu, 23 Oct 2025 at 10:27, Darren Picard ***@***.***> wrote:
***@***.**** requested changes on this pull request.
Hello Kamalpannu
While we appreciate you spending the time to put up this PR to improve
Chrono. I do not believe we can proceed with this PR.
The entire approach taken in this PR does not really make any sense as
basically this is equivalent to just having an infinite retry policy (which
is maybe something we can do), but would not even work due to duplicate key
constraints.
It is clear that the code was not tested, as it would fail, while also
blocking the processing of any other task
Let us know if you would like to discuss further, but we cannot proceed
with this PR as it is now and a new approach needs to be thought of
------------------------------
In packages/chrono-core/src/processors/create-processor.ts
<#57 (comment)>:
> @@ -18,7 +18,7 @@ export type ProcessorConfiguration = {
claimStaleTimeoutMs?: number;
/** The maximum time a task handler can take to complete before it will be considered timed out @default 5000ms */
taskHandlerTimeoutMs?: number;
- /** The maximum number of retries for a task handler, before task is marked as failed. @default 5 */
+ /** The maximum number of retries for a tasak handler, before task is marked as failed. @default 5 */
⬇️ Suggested change
- /** The maximum number of retries for a tasak handler, before task is marked as failed. @default 5 */
+ /** The maximum number of retries for a task handler, before task is marked as failed. @default 5 */
------------------------------
In packages/chrono-core/src/processors/simple-processor.ts
<#57 (comment)>:
> @@ -114,7 +105,6 @@ export class SimpleProcessor<
);
this.stopRequested = true;
-
this should produce a linting warning having removed this empty line
------------------------------
In packages/chrono-core/src/processors/simple-processor.ts
<#57 (comment)>:
> + await setTimeout(redriveDelayMs); // Delay before redrive
+ try {
+ await this.datastore.redriveFromDlq<TaskKind>();
+ this.emit(ProcessorEvents.TASK_RETRY_SCHEDULED, {
+ task,
+ error: null,
+ errorAt: new Date(),
+ retryScheduledAt: new Date(Date.now() + redriveDelayMs),
+ });
+ } catch (redriveError) {
+ this.emit(ProcessorEvents.UNKNOWN_PROCESSING_ERROR, {
+ error: redriveError,
+ timestamp: new Date(),
+ });
+ }
+ }
slightly confused by this
1. you are blocking the processing-loop for 60 seconds. meaning no new
tasks can be claimed while this flow is being hit
2. redriveFromDlq grabs all "tasks" in the DLQ yet you only emit a
single TASK_RETRY_SCHEDULED event despite the possibility of many more
being "re-scheduled"
3. this logic adds a task document to the DLQ collection. Queries it.
Inserts identical document into main collection. but does not update the
"old" document in any way. And because you have now by-passed the await
this.datastore.fail call the original document will be re-queried
after the timeout interval.
------------------------------
In packages/chrono-mongo-datastore/src/chrono-mongo-datastore.ts
<#57 (comment)>:
> + const database = await this.getDatabase();
+ const dlqName = this.config.dlqCollectionName ?? 'chrono-tasks-dlq';
+ const dlqCollection = database.collection(dlqName);
+ const mainCollection = database.collection(this.config.collectionName);
+
+ const tasks = await dlqCollection.find<TaskDocument<TaskKind, TaskMapping[TaskKind]>>({}).toArray();
+
+ for (const task of tasks) {
+ // Re-insert into main collection keeping original _id
+ await mainCollection.insertOne({
+ ...task,
+ status: TaskStatus.PENDING,
+ claimedAt: undefined,
+ lastExecutedAt: new Date(),
+ _id: task._id, // make sure original _id is kept
+ });
i kinda mentioned it in other comment but you are just re-inserting
another document into the main collection. you're even still propagating
the retryCount meaning this process will kick off again on a single error.
------------------------------
In packages/chrono-mongo-datastore/src/chrono-mongo-datastore.ts
<#57 (comment)>:
> @@ -275,6 +348,16 @@ export class ChronoMongoDatastore<TaskMapping extends TaskMappingBase>
return database.collection<TaskDocument<TaskKind, TaskMapping[TaskKind]>>(this.config.collectionName);
}
+ /*private async dlqCollection<TaskKind extends keyof TaskMapping>(): Promise<
+ Collection<TaskDocument<TaskKind, TaskMapping[TaskKind]>>
+ > {
+ const database = await this.getDatabase();
+ if (!this.config.dlqCollectionName) {
+ throw new Error('DLQ collection name is not set');
+ }
+ return database.collection<TaskDocument<TaskKind, TaskMapping[TaskKind]>>(this.config.dlqCollectionName);
+ }*/
???
was this meant to be deleted?
------------------------------
In packages/chrono-mongo-datastore/src/chrono-mongo-datastore.ts
<#57 (comment)>:
> + ...task,
+ _id: objectId,
+ error: error?.message,
+ failedAt: new Date(),
+ });
+
+ // Remove from main collection by _id
+ await mainCollection.deleteOne({ _id: objectId });
+ }
+
+ /**
+ * Redrive messages from the Dead Letter Queue back into main store
+ */
+ async redriveFromDlq<TaskKind extends keyof TaskMapping>(): Promise<void> {
+ const database = await this.getDatabase();
+ const dlqName = this.config.dlqCollectionName ?? 'chrono-tasks-dlq';
defaults should be decide in the constructor. aka we can make
dlqCollectionName required
------------------------------
In package.json
<#57 (comment)>:
> @@ -32,5 +32,9 @@
"typescript": "^5.9.2",
"vitest": "^3.2.4",
"vitest-mock-extended": "^3.1.0"
+ },
+ "dependencies": {
+ ***@***.***/chrono": "^0.5.1",
+ "mongodb-memory-server": "^10.2.3"
???? not sure why these were added?
------------------------------
In packages/chrono-mongo-datastore/src/chrono-mongo-datastore.ts
<#57 (comment)>:
> + async redriveFromDlq<TaskKind extends keyof TaskMapping>(): Promise<void> {
+ const database = await this.getDatabase();
+ const dlqName = this.config.dlqCollectionName ?? 'chrono-tasks-dlq';
+ const dlqCollection = database.collection(dlqName);
+ const mainCollection = database.collection(this.config.collectionName);
+
+ const tasks = await dlqCollection.find<TaskDocument<TaskKind, TaskMapping[TaskKind]>>({}).toArray();
+
+ for (const task of tasks) {
+ // Re-insert into main collection keeping original _id
+ await mainCollection.insertOne({
+ ...task,
+ status: TaskStatus.PENDING,
+ claimedAt: undefined,
+ lastExecutedAt: new Date(),
+ _id: task._id, // make sure original _id is kept
this will error. you cannot have 2 documents in main collection with same
_id
—
Reply to this email directly, view it on GitHub
<#57 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/BGGIFMNU374QGQIQO46F4I33ZEFWNAVCNFSM6AAAAACJXYHEI2VHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZTGNZRGI3TONBVGA>
.
You are receiving this because you authored the thread.Message ID:
***@***.***>
|
|
Hi @darrenpicard25 and team, Thanks again for your time and guidance earlier — it helped me understand the system’s workflow much better! |
Summary
This PR fixes the Dead Letter Queue (DLQ) workflow in ChronoMongoDatastore:
_idwhen moving tasks to and from the DLQ.PENDINGwhen redriving from DLQ.Related Issue
Closes #35
Verification
PENDING.vitestand all 4 tests pass.Notes