diff --git a/README.md b/README.md index 0d6d653..2e318a0 100644 --- a/README.md +++ b/README.md @@ -19,9 +19,16 @@ The main difference in this package compared to `msavin:jobs` is that this packa Unfortunately I found the job queue system in `msavin:jobs` too fundamentally built-in to modify and create a PR, so it was easier to write my own package. +## Meteor 2.8 / 3.0 Async Compatibility + +**BREAKING CHANGE:** This version is only compatible with Async Mongo methods and therefore requires Meteor 2.8+. The old synchronous Mongo methods were deprecated since Meteor 2.8 and are removed in Meteor 3.0 + +* To upgrade from earlier versions of this package, replace all calls to our old `...method()` sync methods with the new `await ...methodAsync()` async methods. +* To use our legacy synchronous version, install with `meteor add wildhart:jobs@1.0.18` (you can copy typescript definition from this [tagged commit](https://github.com/wildhart/meteor.jobs/tree/v1.0.18)) + ## Quick Start -First, install the package, and import if necessary: +First, install the package, and import: ```bash meteor add wildhart:jobs @@ -35,16 +42,16 @@ Then, write your background jobs like you would write your methods: ```javascript Jobs.register({ - "sendReminder": function (to, message) { + "sendReminder": async function (to, message) { var call = HTTP.put("http://www.magic.com/sendEmail", { to: to, message: message }); if (call.statusCode === 200) { - this.success(call.result); + await this.successAsync(call.result); } else { - this.reschedule({in: {minutes: 5}}); + await this.rescheduleAsync({in: {minutes: 5}}); } } }); @@ -53,13 +60,13 @@ Jobs.register({ Finally, schedule a background job like you would call a method: ```javascript -Jobs.run("sendReminder", "jon@example.com", "The future is here!"); +await Jobs.runAsync("sendReminder", "jon@example.com", "The future is here!"); ``` -One more thing: the function above will schedule the job to run on the moment that the function was called, however, you can delay it by passing in a special **configuration object** at the end: +The function above will schedule the job to run immediately, however, you can delay it by passing in a special **configuration object** at the end: ```javascript -Jobs.run("sendReminder", "jon@example.com", "The future is here!", { +await Jobs.runAsync("sendReminder", "jon@example.com", "The future is here!", { in: { days: 3, }, @@ -67,10 +74,9 @@ Jobs.run("sendReminder", "jon@example.com", "The future is here!", { hour: 9, minute: 42 }, - priority: 9999999999 }); ``` -The configuration object supports `date`, `in`, `on`, and `priority`, all of which are completely optional, see [Jobs.run](#jobsrun). +The configuration object supports `date`, `in`, `on`, and `priority`, all of which are completely optional, see [Jobs.runAsync](#jobsrun). ## New Strongly Typed API @@ -88,7 +94,7 @@ With the new API, the above code would be replaced with: ```typescript import { TypedJob } from "meteor/wildhart:jobs"; -export const sendReminderJob = new TypedJob('sendReminders', function(to: string, message: string) { +export const sendReminderJob = new TypedJob('sendReminders', async function(to: string, message: string) { ... }); ``` @@ -98,7 +104,7 @@ When scheduling the job, you can reference the class instance directly: ```typescript import { sendReminderJob } from './reminders'; -sendReminderJob.withArgs('jon@example.com", The future is here!').run({ +await sendReminderJob.withArgs('jon@example.com", The future is here!').runAsync({ in: { days: 3, }, @@ -106,35 +112,34 @@ sendReminderJob.withArgs('jon@example.com", The future is here!').run({ hour: 9, minute: 42 }, - priority: 9999999999 }); ``` Almost all of the traditional API can be replaced with this new API: ```typescript // as example above -sendReminderJob.withArgs(...).run(configObject); -// equivalent to Jobs.clear('*', 'sendReminder', '*', ...args); -sendReminderJob.clear('*', ...args); -// NEW API equivalent to Jobs.collection.clear({...query, name: 'sendReminderJob'); -sendReminderJob.clearQuery(query); - -// same as Jobs.remove(....), but without having to import "Jobs" -const scheduledJob: JobDocument | false = myJob.withArgs(...).run(...); -sendReminderJob.remove(scheduledJob); +await sendReminderJob.withArgs(...).runAsync(configObject); +// equivalent to await Jobs.clearAsync('*', 'sendReminder', '*', ...args); +await sendReminderJob.clearAsync('*', ...args); +// NEW API equivalent to await Jobs.collection.clearAsync({...query, name: 'sendReminderJob'); +await sendReminderJob.clearQueryAsync(query); + +// same as await Jobs.removeAsync(....), but without having to import "Jobs" +const scheduledJob: JobDocument | false = await myJob.withArgs(...).runAsync(...); +await sendReminderJob.removeAsync(scheduledJob); // or -sendReminderJob.remove(scheduledJob._id); - -// equivalent to Jobs.start('sendReminders'); -sendReminderJob.start(); -// equivalent to Jobs.stop('sendReminders'); -sendReminderJob.stop(); -// equivalent to Jobs.count('sendReminders', 'jon@example.com'); -sendReminderJob.count('jon@example.com'); -// equivalent to Jobs.findOne('sendReminders', 'jon@example.com'); -sendReminderJob.findOne('jon@example.com'); -// this is new API equivalent to Jobs.update({query, ..name: 'sendReminderJob'}, options); -sendReminderJob.update(query, options); +await sendReminderJob.removeAsync(scheduledJob._id); + +// equivalent to await Jobs.startAsync('sendReminders'); +await sendReminderJob.startAsync(); +// equivalent to await Jobs.stopAsync('sendReminders'); +await sendReminderJob.stopAsync(); +// equivalent to await Jobs.countAsync('sendReminders', 'jon@example.com'); +await sendReminderJob.countAsync('jon@example.com'); +// equivalent to await Jobs.findOneAsync('sendReminders', 'jon@example.com'); +await sendReminderJob.findOneAsync('jon@example.com'); +// this is new API equivalent to await Jobs.updateAsync({query, ..name: 'sendReminderJob'}, options); +await sendReminderJob.updateAsync(query, options); // if you need to query the Jobs collection directly, the original name of the job can be obtained: sendReminderJob.name; // == 'sendReminders' @@ -147,14 +152,14 @@ file where the job was defined, which by definition should be exposed on the ser Therefore, in shared client/server code (e.g. a Meteor Method) if you are used to doing: ```javascript if (Meteor.isServer) { - Jobs.run('sendEmail', 'jon@example.com', 'hello', {in: {days: 1}}); + await Jobs.runAsync('sendEmail', 'jon@example.com', 'hello', {in: {days: 1}}); } ``` -You have to be careful not to import the server-side code into the front-end, by using `import().then()`: +You have to be careful not to import the server-side code into the front-end, so instead use `import().then()`: ```javascript if (Meteor.isServer) { - import('../../server/reminderJobs').then(({sendEmailJob}) => { - sendEmailJob.withArgs(...).run(...)); + import('../../server/reminderJobs').then(async ({sendEmailJob}) => { + await sendEmailJob.withArgs(...).runAsync(...); }); } ``` @@ -165,14 +170,14 @@ if (Meteor.isServer) { - [Jobs.configure](#jobsconfigure) - [Jobs.register](#jobsregister) - - [Jobs.run](#jobsrun) - - [Jobs.execute](#jobsexecute) - - [Jobs.reschedule](#jobsreschedule) - - [Jobs.replicate](#jobsreplicate) - - [Jobs.start](#jobsstart) - - [Jobs.stop](#jobsstop) - - [Jobs.clear](#jobsclear) - - [Jobs.remove](#jobsremove) + - [Jobs.runAsync](#jobsrunasync) + - [Jobs.executeAsync](#jobsexecuteasync) + - [Jobs.rescheduleAsync](#jobsrescheduleasync) + - [Jobs.replicateAsync](#jobsreplicateasync) + - [Jobs.startAsync](#jobsstartasync) + - [Jobs.stopAsync](#jobsstopasync) + - [Jobs.clearAsync](#jobsclearasync) + - [Jobs.removeAsync](#jobsremoveasync) - [Jobs.jobs](#jobsjobs) - [Jobs.collection](#jobscollection) - [Repeating Jobs](#repeating-jobs) @@ -216,55 +221,55 @@ Jobs.configure({ ```typescript Jobs.register({ - sendEmail: function (to, content) { - var send = Magic.sendEmail(to, content); + sendEmail: async function (to, content) { + var send = await Magic.sendEmail(to, content); if (send) { - this.success(); + await this.successAsync(); } else { - this.reschedule({in: {minutes: 5}}); + await this.rescheduleAsync({in: {minutes: 5}}); } }, - sendReminder: function (userId, content) { - var doc = Reminders.insert({ + sendReminder: async function (userId, content) { + var doc = await Reminders.insertAsync({ to: userId, content: content }) if (doc) { - this.remove(); + await this.removeAsync(); } else { - this.reschedule({in: {minutes: 5}}); + await this.rescheduleAsync({in: {minutes: 5}}); } } }); // or NEW API: -const sendEmail = new TypedJob('sendEmail', function(to: string, content: EmailDoc) { +const sendEmail = new TypedJob('sendEmail', async function(to: string, content: EmailDoc) { ... }); -const sendReminder = new TypedJob('sendReminder', function(to: string, content: ReminderContent) { +const sendReminder = new TypedJob('sendReminder', async function(to: string, content: ReminderContent) { ... }); ``` Each job is bound with a set of functions to give you maximum control over how the job runs: - `this.document` - access the job document - - `this.success()` - tell the queue the job is completed - - `this.failure()` - tell the queue the job failed - - `this.reschedule(config)` - tell the queue to schedule the job for a future date - - `this.remove()` - remove the job from the queue - - `this.replicate(config)` - create a copy of the job with a different due date provided by `config` (returns the new jobId) + - `this.successAsync()` - tell the queue the job is completed + - `this.failureAsync()` - tell the queue the job failed + - `this.rescheduleAsync(config)` - tell the queue to schedule the job for a future date + - `this.removeAsync()` - remove the job from the queue + - `this.replicateAsync(config)` - create a copy of the job with a different due date provided by `config` (returns the new jobId) Each job must be resolved with success, failure, reschedule, and/or remove. See [Repeating Jobs](#repeating-jobs) and [Async Jobs/Promises](#async-jobs) -### Jobs.run +### Jobs.runAsync -`Jobs.run` allows you to schedule a job to run. You call it just like you would call a method, by specifying the job name and its arguments. At the end, you can pass in a special configuration object. Otherwise, it will be scheduled to run as soon as possible. +`Jobs.runAsync` allows you to schedule a job to run. You call it just like you would call a method, by specifying the job name and its arguments. At the end, you can pass in a special configuration object. Otherwise, it will be scheduled to run immediately. ```javascript -var jobDoc = Jobs.run("sendReminder", "jon@example.com", "The future is here!", { +var jobDoc = await Jobs.runAsync("sendReminder", "jon@example.com", "The future is here!", { in: { days: 3, }, @@ -272,14 +277,13 @@ var jobDoc = Jobs.run("sendReminder", "jon@example.com", "The future is here!", hour: 9, minute: 42 }, - priority: 9999999999, singular: true }); // or NEW API: -sendReminderJob.withArgs("jon@example.com", "The future is here!").run(...); +await sendReminderJob.withArgs("jon@example.com", "The future is here!").runAsync(...); ``` -`Jobs.run` returns a `jobDoc`. +`Jobs.runASync` returns a `jobDoc`. The configuration object supports the following inputs: @@ -307,83 +311,82 @@ The configuration object supports the following inputs: - **`callback`** - Function - Run a callback function after scheduling the job -### Jobs.execute +### Jobs.executeAsync -`Jobs.execute` allows you to run a job ahead of its due date. It can only work on jobs that have not been resolved. +`Jobs.executeAsync` allows you to run a job ahead of its due date. It can only work on jobs that have not been resolved. ```javascript -Jobs.execute(doc) // or (doc._id) +await Jobs.executeAsync(doc) // or (doc._id) // or NEW API -sendReminderJob.execute(doc); // or (doc._id) +await sendReminderJob.executeAsync(doc); // or (doc._id) ``` -### Jobs.reschedule +### Jobs.rescheduleAsync -`Jobs.reschedule` allows you to reschedule a job. It can only work on jobs that have not been resolved. +`Jobs.rescheduleAsync` allows you to reschedule a job. It can only work on jobs that have not been resolved. ```javascript -Jobs.reschedule(job, { // or (job._id) +await Jobs.rescheduleAsync(job, { // or (job._id) in: { minutes: 5 }, - priority: 9999999 }); // or NEW API -sendReminderJob.execute(job, {...}); // or (job._id, {...}); +await sendReminderJob.rescheduleAsync(job, {...}); // or (job._id, {...}); ``` The configuration is passed in as the second argument, and it supports the same inputs as `Jobs.run`. -### Jobs.replicate +### Jobs.replicateAsync -`Jobs.replicate` allows you to replicate a job. +`Jobs.replicateAsync` allows you to replicate a job. ```javascript -var jobId = Jobs.replicate(job, { // or (job._id, {... +var jobId = await Jobs.replicateAsync(job, { // or (job._id, {... in: { minutes: 5 } }) // or NEW API -sendReminderJob.execute(job, {...}); // or (job._id, {...}); +await sendReminderJob.replicateAsync(job, {...}); // or (job._id, {...}); ``` -`Jobs.replicate` returns a `jobId`. +`Jobs.replicateAsync` returns a `jobId`. -### Jobs.start +### Jobs.startAsync -`Jobs.start` allows you start all the queues. This runs automatically unless `autoStart` is set to `false`. If you call the function with no arguments, it will start all the queues. If you pass in a String, it will start a queue with that name. If you pass in an Array, it will start all the queues named in the array. +`Jobs.startAsync` allows you start all the queues. This runs automatically unless `autoStart` is set to `false`. If you call the function with no arguments, it will start all the queues. If you pass in a String, it will start a queue with that name. If you pass in an Array, it will start all the queues named in the array. ```javascript // Start all the queues -Jobs.start() +await Jobs.startAsync() // Start just one queue -Jobs.start("sendReminder") +await Jobs.startAsync("sendReminder") // or NEW API -sendReminderJob.start(); +await sendReminderJob.startAsync(); // Start multiple queues -Jobs.start(["sendReminder", "sendEmail"]) +await Jobs.startAsync(["sendReminder", "sendEmail"]) ``` Unlike msavin:sjobs, this function can be called on any server and whichever server is currently in control of the job queue will be notified. -### Jobs.stop +### Jobs.stopAsync -`Jobs.stop` allows you stop all the queues. If you call the function with no arguments, it will stop all the queues. If you pass in a String, it will stop a queue with that name. If you pass in an Array, it will stop all the queues named in the array. +`Jobs.stopAsync` allows you stop all the queues. If you call the function with no arguments, it will stop all the queues. If you pass in a String, it will stop a queue with that name. If you pass in an Array, it will stop all the queues named in the array. ```javascript // Stop all the queues -Jobs.stop() +await Jobs.stopAsync() // Stop just one queue -Jobs.stop("sendReminder") +await Jobs.stopAsync("sendReminder") // or NEW API -sendReminderJob.stop(); +await sendReminderJob.stopAsync(); // Stop multiple queues -Jobs.stop(["sendReminder", "sendEmail"]) +await Jobs.stopAsync(["sendReminder", "sendEmail"]) ``` Unlike msavin:sjobs, this function can be called on any server and whichever server is currently in control of the job queue will be notified. @@ -393,17 +396,17 @@ mongo> db.jobs_dominator_3.update({_id:"dominatorId"}, {$set: {pausedJobs: ['*'] ``` The in-control server should observe the change and stop instantly. Use `{$unset: {pausedJobs: 1}}` or `{$set: {pausedJobs: []}}` to start all the queues again. -### Jobs.clear +### Jobs.clearAsync -`Jobs.clear` allows you to clear all or some of the jobs in your database. +`Jobs.clearAsync` allows you to clear all or some of the jobs in your database. ```javascript -var count = Jobs.clear(state, jobName, ...arguments, callback); +var count = await Jobs.clearAsync(state, jobName, ...arguments, callback); e.g: -count = Jobs.clear(); // remove all completed jobs (success or failure) -count = Jobs.clear('*'); // remove all jobs -count = Jobs.clear('failure', 'sendEmail', 'jon@example.com', function(err, count) {console.log(err, count)}); +count = await Jobs.clearAsync(); // remove all completed jobs (success or failure) +count = await Jobs.clearAsync('*'); // remove all jobs +count = await Jobs.clearAsync('failure', 'sendEmail', 'jon@example.com', function(err, count) {console.log(err, count)}); // or NEW API -count = sendEmailJob.clear('failure', 'jon@example.com', ...); +count = await sendEmailJob.clearAsync('failure', 'jon@example.com', ...); ``` Parameters: * `state` for selecting a job state (either `pending`, `success`, `failure`, or `*` to select all of them), or omit to all except `pending` jobs. @@ -411,14 +414,14 @@ Parameters: * provide `arguments` to match jobs only with the same arguments. * `callback` to provide a callback function with `error` and `result` parameters, where `result` is the number of jobs removed. -### Jobs.remove +### Jobs.removeAsync -`Jobs.remove` allows you to remove a job from the collection. +`Jobs.removeAsync` allows you to remove a job from the collection. ```javascript -var success = Jobs.remove(doc); // or (doc._id) +var success = await Jobs.removeAsync(doc); // or (doc._id) // or NEW API -sendEmailJob.remove(doc); // or (doc._id) +await sendEmailJob.removeAsync(doc); // or (doc._id) ``` ### Jobs.jobs @@ -435,19 +438,19 @@ var nJobTypes = jobNames.length; // 2 ## Repeating jobs -Repeating jobs can be created by using `this.reschedule()` in the job function, e.g.: +Repeating jobs can be created by using `this.rescheduleAsync()` in the job function, e.g.: ```javascript Jobs.register({ - processMonthlyPayments() { - this.reschedule({in: {months: 1}}); - processPayments(); + async processMonthlyPayments() { + await this.rescheduleAsync({in: {months: 1}}); + await processPayments(); }, }); -Jobs.run('processMonthlyPayments', {singular: true}); +await Jobs.runAsync('processMonthlyPayments', {singular: true}); ``` -Since this package doesn't keep a job history (compared with msavin:sjobs), you can use `this.reschedule()` indefinitely without polluting the jobs database, instead of having to use `this.replicate()` followed by `this.remove()`. +Since this package doesn't keep a job history (compared with msavin:sjobs), you can use `this.rescheduleAsync()` indefinitely without polluting the jobs database, instead of having to use `this.replicateAsync()` followed by `this.removeAsync()`. ## Async Jobs @@ -456,11 +459,11 @@ The job function can use `async/await` or return a promise: Jobs.register({ async asyncJob(...args) { await new Promise(resolve => Meteor.setTimeout(() => resolve(0), 4000)); - this.remove(); + await this.removeAsync(); }, promiseJob(...args) { - return new Promise(resolve => Meteor.setTimeout(() => { - this.remove(); + return new Promise(resolve => Meteor.setTimeout(async () => { + await this.remove(); resolve(0); }, 8000)); }, @@ -480,6 +483,7 @@ The job queue intelligently prevents lots of a single job dominating the job que ## API Differences From msavin:sjobs If any of these differences make this package unsuitable for you, please let me know and I'll consider fixing. +- Since v2.0 of this package, most methods have been renamed from `...method()` to `...methodAsync()` and are asynchronous. - This package doesn't keep a job history. - `failed` jobs are not retried, unless they have already been rescheduled. - The Job configuration object doesn't support the `data` attribute - I never found any use for this. @@ -493,17 +497,20 @@ If any of these differences make this package unsuitable for you, please let me - `setServerId` can be a `String` as as well as a `Function` - `log` can be a `Boolean` as well as a `Function` - In a [job function](#jobsregister), `this.set()` and `this.get()` are not provided - I never found any use for this. -- In a [job function](#jobsregister), `this.success()` and `this.failure()` to not take a `result` parameter - this package doesn't keep a job history +- In a [job function](#jobsregister), `this.successAsync()` and `this.failureAsync()` to not take a `result` parameter - this package doesn't keep a job history - [singular](#jobsrun) jobs only check for `pending` jobs of the same name, so they can be run again even if a previous job failed. -- `Jobs.start()` and `Jobs.stop()` can be called on any server and whichever server is in control of the job queue will be notified. -- `Jobs.cancel()` doesn't exist. Just remove it with [Jobs.remove()](#jobsremove) - I don't see the point in keeping old jobs lying around. -- [Jobs.clear()](#jobsclear) can take additional `argument` parameters to only delete jobs matching those arguments. +- `Jobs.startAsync()` and `Jobs.stopAsync()` can be called on any server and whichever server is in control of the job queue will be notified. +- `Jobs.cancel()` doesn't exist. Just remove it with [Jobs.removeAsync()](#jobsremove) - I don't see the point in keeping old jobs lying around. +- [Jobs.clearAsync()](#jobsclear) can take additional `argument` parameters to only delete jobs matching those arguments. - [Jobs.jobs](#jobsjobs) doesn't exist in msavin:sjobs ------ ## Version History +#### 2.0.0 (2023-12-20) +- **BREAKING CHANGE** New asynchronous API for Meteor 3.0 compatibility/ + #### 1.0.18 (2023-08-19) - Added new [strongly-typed API](#new-strongly-typed-api). diff --git a/TypedJob.ts b/TypedJob.ts index 09c3002..c3d65b9 100644 --- a/TypedJob.ts +++ b/TypedJob.ts @@ -7,34 +7,34 @@ export default class TypedJob { public withArgs(...args: TArgs) { return { - run: (config?: Partial) => Jobs.run(this.name, ...args, config), + run: (config?: Partial) => Jobs.runAsync(this.name, ...args, config), } } - public clear = (state: '*' | Jobs.JobStatus | Jobs.JobStatus[], ...args: PartialArray) => Jobs.clear(state, this.name, ...args); + public clearAsync = (state: '*' | Jobs.JobStatus | Jobs.JobStatus[], ...args: PartialArray) => Jobs.clearAsync(state, this.name, ...args); - public clearQuery = (query: Mongo.Selector) => Jobs.collection.remove({...query, name: this.name}); + public clearQueryAsync = (query: Mongo.Selector) => Jobs.collection.removeAsync({...query, name: this.name}); - public remove = (jobOrId: JobOrId) => Jobs.remove(jobOrId); + public removeAsync = (jobOrId: JobOrId) => Jobs.removeAsync(jobOrId); - public execute = (jobOrId: JobOrId) => Jobs.execute(jobOrId); + public executeAsync = (jobOrId: JobOrId) => Jobs.executeAsync(jobOrId); - public reschedule = (jobOrId: JobOrId, config: Partial) => Jobs.reschedule(jobOrId, config); + public rescheduleAsync = (jobOrId: JobOrId, config: Partial) => Jobs.rescheduleAsync(jobOrId, config); - public replicate = (jobOrId: JobOrId, config: Partial) => Jobs.replicate(jobOrId, config); + public replicateAsync = (jobOrId: JobOrId, config: Partial) => Jobs.replicateAsync(jobOrId, config); - public start = () => Jobs.start(this.name); + public startAsync = () => Jobs.startAsync(this.name); - public stop = () => Jobs.stop(this.name); + public stopAsync = () => Jobs.stopAsync(this.name); - public count = (...args: PartialArray) => Jobs.count(this.name, ...args); + public countAsync = (...args: PartialArray) => Jobs.countAsync(this.name, ...args); - public update: Mongo.Collection['update'] = (selector, options) => { + public updateAsync: Mongo.Collection['updateAsync'] = (selector, options) => { const mySelector = typeof selector == 'string' ? selector : {...selector, name: this.name}; - return Jobs.collection.update(mySelector, options); + return Jobs.collection.updateAsync(mySelector, options); } - public findOne = (...args: PartialArray) => Jobs.findOne(this.name, ...args); + public findOneAsync = (...args: PartialArray) => Jobs.findOneAsync(this.name, ...args); } // create an array type which doesn't require all elements of the original type diff --git a/jobs.ts b/jobs.ts index 6194d8e..093ea5d 100644 --- a/jobs.ts +++ b/jobs.ts @@ -33,42 +33,42 @@ namespace Dominator { let _pingInterval: number | null = null; let _takeControlTimeout: number | null = null; - Meteor.startup(() => { + Meteor.startup(async () => { log('Jobs', `Meteor.startup, startupDelay: ${settings.startupDelay / 1000}s...`); - collection.remove({_id: {$ne: DOMINATOR_ID}}); - Meteor.setTimeout(() => init(), settings.startupDelay); + await collection.removeAsync({_id: {$ne: DOMINATOR_ID}}); + Meteor.setTimeout(() => initAsync(), settings.startupDelay); }) - export function init() { + export async function initAsync() { _serverId = (typeof settings.setServerId == 'string' && settings.setServerId) || (typeof settings.setServerId == 'function' && settings.setServerId()) || Random.id(); collection.find({_id: DOMINATOR_ID}).observe({ - changed: (newPing) => _observer(newPing), + changed: (newPing) => _observerSync(newPing), }); - lastPing = collection.findOne(); + lastPing = await collection.findOneAsync(); const lastPingIsOld = lastPing && lastPing.date && lastPing.date.valueOf() < Date.now() - settings.maxWait; log('Jobs', 'startup', _serverId, JSON.stringify(lastPing), 'isOld='+lastPingIsOld); // need !lastPing.serverId on following line in case Jobs.start() or Jobs.stop() updates pausedJobs before if (!lastPing || !lastPing.serverId) { // fresh installation, no one is in control yet. - _takeControl('no ping'); + await _takeControlAsync('no ping'); } else if (lastPing.serverId == _serverId) { // we were in control but have restarted - resume control - _takeControl('restarted'); + await _takeControlAsync('restarted'); } else if (lastPingIsOld) { // other server lost control - take over - _takeControl('lastPingIsOld ' + JSON.stringify(lastPing)); + await _takeControlAsync('lastPingIsOld ' + JSON.stringify(lastPing)); } else { // another server is recently in control, set a timer to check the ping... - _observer(lastPing); + _observerSync(lastPing); } } - export function start(jobNames?: string[] | string) { + export async function startAsync(jobNames?: string[] | string) { const update: Mongo.Modifier = {} if (!jobNames || jobNames == '*') { // clear the pausedJobs list, start all jobs @@ -77,11 +77,11 @@ namespace Dominator { update.$pullAll = {pausedJobs: typeof jobNames == 'string' ? [jobNames] : jobNames}; } - collection.upsert({_id: DOMINATOR_ID}, update); + await collection.upsertAsync({_id: DOMINATOR_ID}, update); log('Jobs', 'startJobs', jobNames, update); } - export function stop(jobNames?: string[] | string) { + export async function stopAsync(jobNames?: string[] | string) { const update: Mongo.Modifier = {} if (!jobNames || jobNames == '*') { update.$set = {pausedJobs: ['*']}; // stop all jobs @@ -89,22 +89,22 @@ namespace Dominator { update.$addToSet = {pausedJobs: typeof jobNames == 'string' ? jobNames : {$each: jobNames}}; } - collection.upsert({_id: DOMINATOR_ID}, update); + await collection.upsertAsync({_id: DOMINATOR_ID}, update); log('Jobs', 'stopJobs', jobNames, update); } - function _observer(newPing: Document) { + function _observerSync(newPing: Document) { log('Jobs', 'dominator.observer', newPing); if (lastPing && lastPing.serverId == _serverId && newPing.serverId != _serverId) { // we were in control but another server has taken control - _relinquishControl(); + _relinquishControlSync(); } const oldPausedJobs = lastPing && lastPing.pausedJobs || []; lastPing = newPing; if ((lastPing.pausedJobs || []).join() != oldPausedJobs.join()) { // the list of paused jobs has changed - update the query for the job observer // needs dominator.lastPing.pausedJobs to be up-to-date so do lastPing = newPing above - Queue.restart(); + Queue.restartSync(); } if (_takeControlTimeout) { Meteor.clearTimeout(_takeControlTimeout); @@ -114,29 +114,29 @@ namespace Dominator { // we're not in control, set a timer to take control in the future... _takeControlTimeout = Meteor.setTimeout(() => { // if this timeout isn't cleared then the dominator hasn't been updated recently so we should take control. - _takeControl('lastPingIsOld ' + JSON.stringify(lastPing)); + _takeControlAsync('lastPingIsOld ' + JSON.stringify(lastPing)); }, settings.maxWait); } } - function _takeControl(reason: string) { + async function _takeControlAsync(reason: string) { log('Jobs', 'takeControl', reason); - _ping(); - Queue.start(); + await _pingAsync(); + Queue.startSync(); } - function _relinquishControl() { + function _relinquishControlSync() { log('Jobs', 'relinquishControl'); if (_pingInterval) { Meteor.clearInterval(_pingInterval); _pingInterval = null; } - Queue.stop(); + Queue.stopSync(); } - function _ping() { + async function _pingAsync() { if (!_pingInterval) { - _pingInterval = Meteor.setInterval(() =>_ping(), settings.maxWait * 0.8); + _pingInterval = Meteor.setInterval(() =>_pingAsync(), settings.maxWait * 0.8); } const newPing: Document = { serverId: _serverId, @@ -146,7 +146,7 @@ namespace Dominator { if (!lastPing) { lastPing = newPing; } - collection.upsert({_id: DOMINATOR_ID}, newPing); + await collection.upsertAsync({_id: DOMINATOR_ID}, newPing); log('Jobs', 'ping', newPing.date, 'paused:', newPing.pausedJobs); } } @@ -160,7 +160,7 @@ export namespace Jobs { maxWait: number, log: typeof console.log | boolean; autoStart: boolean; - setServerId?: string | Function; + setServerId?: string | (() => string); defaultCompletion?: 'success' | 'remove'; } @@ -173,7 +173,7 @@ export namespace Jobs { awaitAsync: boolean; unique: boolean; singular: boolean; - callback?: Function; + callback?: (err: string | null, res: any) => void | Promise; } export type JobStatus = "pending" | "success" | "failure" | "executing"; @@ -191,14 +191,14 @@ export namespace Jobs { export interface JobThisType { document: JobDocument; - replicate(config: Partial): string | null | false; - reschedule(config: Partial): void; - remove(): boolean; - success(): void; - failure(): void; + replicateAsync(config: Partial): Promise; + rescheduleAsync(config: Partial): Promise; + removeAsync(): Promise; + successAsync(): Promise; + failureAsync(): Promise; } - export type JobFunction = (this: JobThisType, ...args: TArgs) => void; + export type JobFunction = (this: JobThisType, ...args: TArgs) => void | Promise; export type JobFunctions = Record>; export type RegisterFn = (jobFunctions: JobFunctions) => void; @@ -206,7 +206,7 @@ export namespace Jobs { export const collection = new Mongo.Collection("jobs_data"); - collection._ensureIndex({name: 1, due: 1, state: 1}); + collection.createIndexAsync({name: 1, due: 1, state: 1}).catch(err => console.error(err)); export function configure(config: Partial) { check(config, { @@ -234,26 +234,26 @@ export namespace Jobs { const isConfig = (input: any) => !!(input && typeof input == 'object' && configItems.some(i => typeof input[i] != 'undefined')); - export function run(name: string, ...args: any) { + export async function runAsync(name: string, ...args: any) { check(name, String); log('Jobs', 'Jobs.run', name, args.length && args[0]); - var config = args.length && args.pop() as Partial || null; + var config: Partial | null = args.length && args.pop() || null; if (config && !isConfig(config)) { args.push(config); config = null; } - var error; + var error: string | null = null; if (config?.unique) { // If a job is marked as unique, it will only be scheduled if no other job exists with the same arguments - if (count(name, ...args)) error = "Unique job already exists"; + if (await countAsync(name, ...args)) error = "Unique job already exists"; } if (config?.singular) { // If a job is marked as singular, it will only be scheduled if no other job is PENDING with the same arguments - if (countPending(name, ...args)) error = 'Singular job already exists'; + if (await countPendingAsync(name, ...args)) error = 'Singular job already exists'; } if (error) { log('Jobs', ' ' + error); if (typeof config?.callback =='function') { - config.callback(error, null); + await config.callback(error, null); } return false; } @@ -266,20 +266,20 @@ export namespace Jobs { created: new Date(), awaitAsync: config?.awaitAsync || undefined, }; - const jobId = collection.insert(jobDoc); + const jobId = await collection.insertAsync(jobDoc); if (jobId) { jobDoc._id = jobId; } else { - error = true; + error = "Couldn't insert job"; } if (typeof config?.callback == 'function') { - config.callback(error, jobId && jobDoc); + await config.callback(error, jobId && jobDoc); } return error ? false : jobDoc as JobDocument; } - export function execute(jobOrId: JobOrId) { + export async function executeAsync(jobOrId: JobOrId) { if (!jobOrId) { console.warn('Jobs', ' Jobs.execute', 'JOB NOT FOUND', jobOrId); return false; @@ -287,7 +287,7 @@ export namespace Jobs { const jobId = typeof jobOrId == 'string' ? jobOrId : jobOrId._id; check(jobId, String); log('Jobs', 'Jobs.execute', jobId); - const job = collection.findOne(jobId); + const job = await collection.findOneAsync(jobId); if (!job) { console.warn('Jobs', 'Jobs.execute', 'JOB NOT FOUND', jobId); return; @@ -297,10 +297,10 @@ export namespace Jobs { return; } - Queue.executeJob(job); + await Queue.executeJobAsync(job); } - export function replicate(jobOrId: JobOrId, config: Partial) { + export async function replicateAsync(jobOrId: JobOrId, config: Partial) { if (!jobOrId) { console.warn('Jobs', ' Jobs.replicate', 'JOB NOT FOUND', jobOrId); return false; @@ -308,7 +308,7 @@ export namespace Jobs { const jobId = typeof jobOrId == 'string' ? jobOrId : jobOrId._id; check(jobId, String); const date = getDateFromConfig(config); - const job = collection.findOne(jobId); + const job = await collection.findOneAsync(jobId); if (!job) { console.warn('Jobs', ' Jobs.replicate', 'JOB NOT FOUND', jobId); return null; @@ -317,12 +317,12 @@ export namespace Jobs { delete (job as any)._id; job.due = date; job.state = 'pending'; - const newJobId = collection.insert(job); + const newJobId = await collection.insertAsync(job); log('Jobs', ' Jobs.replicate', jobId, config); return newJobId; } - export function reschedule(jobOrId: JobOrId, config: Partial) { + export async function rescheduleAsync(jobOrId: JobOrId, config: Partial) { if (!jobOrId) { console.warn('Jobs', ' Jobs.reschedule', 'JOB NOT FOUND', jobOrId); return false; @@ -334,24 +334,25 @@ export namespace Jobs { if (config.priority) { set.priority = config.priority; } - const count = collection.update({_id: jobId}, {$set: set}); + const count = await collection.updateAsync({_id: jobId}, {$set: set}); log('Jobs', ' Jobs.reschedule', jobId, config, date, count); if (typeof config.callback == 'function') { - config.callback(count==0, count); + await config.callback(count==0 ? 'No jobs updated' : null, count); } + return true; } - export function remove(jobOrId: JobOrId) { + export async function removeAsync(jobOrId: JobOrId) { if (!jobOrId) { return false; } const jobId = typeof jobOrId == 'string' ? jobOrId : jobOrId._id; - var count = collection.remove({_id: jobId}); + var count = await collection.removeAsync({_id: jobId}); log('Jobs', ' Jobs.remove', jobId, count); return count > 0; } - export function clear(state?: '*' | JobStatus | JobStatus[], jobName?: string, ...args: any[]) { + export async function clearAsync(state?: '*' | JobStatus | JobStatus[], jobName?: string, ...args: any[]) { const query: Mongo.Query = { state: state === "*" ? {$exists: true} : typeof state === "string" ? state as JobStatus @@ -368,45 +369,45 @@ export namespace Jobs { const callback = args.length && typeof args[args.length - 1] == 'function' ? args.pop() : null; args.forEach((arg, index) => query["arguments." + index] = arg); - const count = collection.remove(query); + const count = await collection.removeAsync(query); log('Jobs', 'Jobs.clear', count, query); - callback?.(null, count); + await callback?.(null, count); return count; } - export function findOne(jobName: string, ...args: any[]) { + export async function findOneAsync(jobName: string, ...args: any[]) { check(jobName, String); const query: Mongo.Query = { name: jobName, }; args.forEach((arg, index) => query["arguments." + index] = arg); - return collection.findOne(query); + return await collection.findOneAsync(query); } - export function count(jobName: string, ...args: any[]) { + export async function countAsync(jobName: string, ...args: any[]) { check(jobName, String); const query: Mongo.Query = { name: jobName, }; args.forEach((arg, index) => query["arguments." + index] = arg); - const count = collection.find(query).count(); + const count = await collection.find(query).countAsync(); return count; }; - export function countPending(jobName: string, ...args: any[]) { + export async function countPendingAsync(jobName: string, ...args: any[]) { check(jobName, String); const query: Mongo.Query = { name: jobName, state: 'pending', }; args.forEach((arg, index) => query["arguments." + index] = arg); - const count = collection.find(query).count(); + const count = await collection.find(query).countAsync(); return count; } - export const start = Dominator.start; - export const stop = Dominator.stop; + export const startAsync = Dominator.startAsync; + export const stopAsync = Dominator.stopAsync; function getDateFromConfig(config: Partial) { // https://github.com/msavin/SteveJobs..meteor.jobs.scheduler.queue.background.tasks/blob/031fdf5051b2f2581a47f64ab5b54ffbb6893cf8/package/server/imports/utilities/helpers/date.js @@ -458,9 +459,9 @@ namespace Queue { var _executing = false; var _awaitAsyncJobs = new Set(); - export function start() { + export function startSync() { if (_handle && _handle != PAUSED) { - stop(); // this also clears any existing job timeout + stopSync(); // this also clears any existing job timeout } const pausedJobs = (Dominator.lastPing || {}).pausedJobs || []; log('Jobs', 'queue.start paused:', pausedJobs); @@ -474,32 +475,32 @@ namespace Queue { sort: {due: 1}, fields: {name: 1, due: 1}, }).observe({ - changed: (job) => _observer('changed', job), - added: (job) => _observer('added', job), + changed: (job) => _observerSync('changed', job), + added: (job) => _observerSync('added', job), }); // this will automatically call the observer which will set the timer for the next job. } - export function stop() { + export function stopSync() { if (_handle && _handle != PAUSED) { _handle.stop(); } _handle = null; - _observer('stop'); + _observerSync('stop'); } - export function restart() { + export function restartSync() { // this is called by Jobs.start() and Jobs.stop() when the list of pausedJobs changes // only restart the queue if we're already watching it (maybe jobs were started/paused inside _executeJobs()) if (_handle) { - start(); + startSync(); } } // cap timeout limit to 24 hours to avoid Node.js limit https://github.com/wildhart/meteor.jobs/issues/5 const MAX_TIMEOUT_MS = 24 *3600 * 1000; - function _observer(type: string, nextJob?: Jobs.JobDocument) { + function _observerSync(type: string, nextJob?: Jobs.JobDocument) { log('Jobs', 'queue.observer', type, nextJob, nextJob && ((nextJob.due.valueOf() - Date.now())/(60*60*1000)).toFixed(2)+'h'); if (_timeout) { Meteor.clearTimeout(_timeout); @@ -512,12 +513,12 @@ namespace Queue { _timeout = nextJob && !_executing ? Meteor.setTimeout(()=> { _timeout = null; - _executeJobs() + _executeJobsAsync() }, msTillNextJob) : null; } } - function _executeJobs() { + async function _executeJobsAsync() { // protect against observer/timeout race condition if (_executing) { console.warn('already executing!'); @@ -529,7 +530,7 @@ namespace Queue { log('Jobs', 'executeJobs', 'paused:', Dominator.lastPing?.pausedJobs); // ignore job queue changes while executing jobs. Will restart observer with .start() at end - stop(); + stopSync(); // need to prevent 1000s of the same job type from hogging the job queue and delaying other jobs // after running a job, add its job.name to doneJobs, then find the next job excluding those in doneJobs @@ -544,8 +545,8 @@ namespace Queue { doneJobs = []; do { // always use the live version of dominator.lastPing.pausedJobs in case jobs are paused/restarted while executing - const lastPing = Dominator.collection.findOne({}, {fields: {pausedJobs: 1}})!; - job = Jobs.collection.findOne({ + const lastPing = (await Dominator.collection.findOneAsync({}, {fields: {pausedJobs: 1}}))!; + job = await Jobs.collection.findOneAsync({ state: "pending", due: {$lte: new Date()}, name: {$nin: doneJobs.concat(lastPing.pausedJobs, Array.from(_awaitAsyncJobs))}, // give other job types a chance... @@ -553,7 +554,7 @@ namespace Queue { }, {sort: {due: 1, priority: -1}}); if (job) { lastJobId = job._id; - executeJob(job); + await executeJobAsync(job); doneJobs.push(job.name); // don't do this job type again until we've tried other jobs. } } while (Dominator.lastPing!.pausedJobs.indexOf('*') == -1 && job); @@ -564,15 +565,15 @@ namespace Queue { } _executing = false; - start(); + startSync(); } - export function executeJob(job: Jobs.JobDocument) { + export async function executeJobAsync(job: Jobs.JobDocument) { log('Jobs', ' ' + job.name); if (typeof Jobs.jobs[job.name] == 'undefined') { console.warn('Jobs', 'job does not exist:', job.name); - setJobState(job._id, 'failure'); + await setJobStateAsync(job._id, 'failure'); return; } @@ -580,36 +581,36 @@ namespace Queue { const self: Jobs.JobThisType = { document: job, - replicate: function(config) { - return Jobs.replicate(job._id, config); + replicateAsync: async function(config) { + return await Jobs.replicateAsync(job._id, config); }, - reschedule: function(config) { + rescheduleAsync: async function(config) { action = 'reschedule'; - Jobs.reschedule(job._id, config); + return await Jobs.rescheduleAsync(job._id, config); }, - remove: function() { + removeAsync: async function() { action = 'remove'; - return Jobs.remove(job._id); + return await Jobs.removeAsync(job._id); }, - success: function() { + successAsync: async function() { action = 'success'; - return setJobState(job._id, action); + return await setJobStateAsync(job._id, action); }, - failure: function() { + failureAsync: async function() { action = 'failure'; - return setJobState(job._id, action); + return await setJobStateAsync(job._id, action); }, }; - function completed() { + async function completedAsync() { if (!action) { if (settings.defaultCompletion == 'success') { - setJobState(job._id, 'success'); + await setJobStateAsync(job._id, 'success'); } else if (settings.defaultCompletion == 'remove') { - Jobs.remove(job._id); + await Jobs.removeAsync(job._id); } else { console.warn('Jobs', "Job was not resolved with success, failure, reschedule or remove. Consider using the 'defaultCompletion' option.", job); - setJobState(job._id, 'failure'); + await setJobStateAsync(job._id, 'failure'); } } } @@ -617,23 +618,23 @@ namespace Queue { let isAsync = false; try { - setJobState(job._id, 'executing'); + await setJobStateAsync(job._id, 'executing'); const res = Jobs.jobs[job.name].apply(self, job.arguments); if (res?.then) { isAsync = true if (job.awaitAsync) { _awaitAsyncJobs.add(job.name); } - res.then(() => { + res.then(async () => { log('Jobs', ' Done async job', job.name, 'result:', action); _awaitAsyncJobs.delete(job.name); - completed(); - }).catch(e => { - console.warn('Jobs', ' Error in async job', job); + await completedAsync(); + }).catch(async (e) => { + console.warn('Jobs', ' Error in async job', job, e); console.warn(e); _awaitAsyncJobs.delete(job.name); if (action != 'reschedule') { - self.failure(); + await self.failureAsync(); } }); } else { @@ -643,17 +644,17 @@ namespace Queue { console.warn('Jobs', 'Error in job', job); console.warn(e); if (action != 'reschedule') { - self.failure(); + await self.failureAsync(); } } if (!isAsync) { - completed(); + await completedAsync(); } } - function setJobState(jobId: string, state: Jobs.JobStatus) { - const count = Jobs.collection.update({_id: jobId}, {$set: {state: state}}); + async function setJobStateAsync(jobId: string, state: Jobs.JobStatus) { + const count = await Jobs.collection.updateAsync({_id: jobId}, {$set: {state: state}}); log('Jobs', 'setJobState', jobId, state, count); } diff --git a/package.js b/package.js index e4e34c8..3ad57ff 100644 --- a/package.js +++ b/package.js @@ -1,13 +1,13 @@ Package.describe({ name: 'wildhart:jobs', - version: '1.0.18', + version: '2.0.0', summary: 'Schedule jobs to run at a later time, including multi-server, super efficient', git: 'https://github.com/wildhart/meteor.jobs', documentation: 'README.md' }); Package.onUse(function(api) { - api.versionsFrom('1.3'); + api.versionsFrom(['2.3', '3.0-alpha.19']); api.use(["typescript@3.0.0 || 4.0.0", "mongo", "random", "ecmascript", "check"], "server"); api.mainModule("jobs.ts", "server"); api.export(["Jobs", "TypedJob"]); diff --git a/wildhart-jobs.d.ts b/wildhart-jobs.d.ts index 6d5e61a..d2cc695 100644 --- a/wildhart-jobs.d.ts +++ b/wildhart-jobs.d.ts @@ -9,7 +9,7 @@ declare module 'meteor/wildhart:jobs' { maxWait: number, log: typeof console.log | boolean; autoStart: boolean; - setServerId?: string | Function; + setServerId?: string | (() => string); defaultCompletion?: 'success' | 'remove'; } @@ -40,7 +40,7 @@ declare module 'meteor/wildhart:jobs' { awaitAsync: boolean; unique: boolean; singular: boolean; - callback?: Function; + callback?: (err: string | null, res: any) => void | Promise; } type JobStatus = "pending" | "success" | "failure" | "executing"; @@ -58,11 +58,11 @@ declare module 'meteor/wildhart:jobs' { interface JobThisType { document: JobDocument; - replicate(config: Partial): string | null | false; - reschedule(config: Partial): void; - remove(): boolean; - success(): void; - failure(): void; + replicateAsync(config: Partial): Promise; + rescheduleAsync(config: Partial): Promise; + removeAsync(): Promise; + successAsync(): Promise; + failureAsync(): Promise; } type JobFunction = (this: JobThisType, ...args: TArgs) => void; @@ -74,17 +74,17 @@ declare module 'meteor/wildhart:jobs' { function configure(options: Partial): void; function register(jobFunctions: JobFunctions): void; - function run(jobName: string, ...args: any[]): JobDocument | false; - function execute(jobOrId: JobOrId): void; - function replicate(jobOrId: JobOrId, config: Partial): string | null; - function reschedule(jobOrId: JobOrId, config: Partial): void; - function remove(jobOrId: JobOrId): boolean; - function clear(state?: '*' | JobStatus | JobStatus[], jobName?: string, ...args: any[]): number; - function findOne(jobName: string, ...args: any[]): JobDocument; - function count(jobName: string, ...args: any[]): number; - function countPending(jobName: string, ...args: any[]): number; - function start(jobNames?: string | string[]): void; - function stop(jobNames?: string | string[]): void; + function runAsync(jobName: string, ...args: any[]): Promise; + function executeAsync(jobOrId: JobOrId): Promise; + function replicateAsync(jobOrId: JobOrId, config: Partial): Promise; + function rescheduleAsync(jobOrId: JobOrId, config: Partial): Promise; + function removeAsync(jobOrId: JobOrId): Promise; + function clearAsync(state?: '*' | JobStatus | JobStatus[], jobName?: string, ...args: any[]): Promise; + function findOneAsync(jobName: string, ...args: any[]): Promise; + function countAsync(jobName: string, ...args: any[]): Promise; + function countPendingAsync(jobName: string, ...args: any[]): Promise; + function startAsync(jobNames?: string | string[]): Promise; + function stopAsync(jobNames?: string | string[]): Promise; } export class TypedJob { @@ -92,19 +92,19 @@ declare module 'meteor/wildhart:jobs' { public name: string; public withArgs(...args: TArgs): { - run: (config?: Partial) => Jobs.JobDocument | false; + runAsync: (config?: Partial) => Jobs.JobDocument | false; } - public clear(state: '*' | Jobs.JobStatus | Jobs.JobStatus[], ...args: PartialArray): number; - public clearQuery(query: Mongo.Selector): void; - public remove(jobOrId: JobOrId): boolean - public execute(jobOrId: JobOrId): false | undefined; - public reschedule(jobOrId: JobOrId, config: Partial): false | undefined; - public replicate(jobOrId: JobOrId, config: Partial): false | undefined; - public start(): void; - public stop(): void; - public count(...args: PartialArray): number; - public update(selector: string | Mongo.Selector, options: Mongo.Modifier): number; - public findOne(...args: PartialArray): Jobs.JobDocument | undefined; + public clearAsync(state: '*' | Jobs.JobStatus | Jobs.JobStatus[], ...args: PartialArray): Promise; + public clearQueryAsync(query: Mongo.Selector): Promise; + public removeAsync(jobOrId: JobOrId): Promise; + public executeAsync(jobOrId: JobOrId): Promise; + public rescheduleAsync(jobOrId: JobOrId, config: Partial): Promise; + public replicateAsync(jobOrId: JobOrId, config: Partial): Promise; + public startAsync(): Promise; + public stopAsync(): Promise; + public countAsync(...args: PartialArray): Promise; + public updateAsync(selector: string | Mongo.Selector, options: Mongo.Modifier): Promise; + public findOneAsync(...args: PartialArray): Promise; } }