A TypeScript job queue library for PostgreSQL, compatible with Ruby Que and que-go implementations.
- Cross-language compatibility: Works with Que (Ruby) and que-go job queues
- PostgreSQL advisory locks: Reliable job processing with no duplicate execution
- TypeScript support: Full type safety with comprehensive interfaces
- Retry logic: Exponential backoff for failed jobs
- Multiple queues: Support for named queues and priorities
- Transaction support: Enqueue jobs within existing database transactions
npm install que-tsnpm install github:Duke-Engineering/que-ts#masterNote: When installing from GitHub, the package will automatically build from TypeScript source using the prepare script.
If you encounter "Cannot find module 'que-ts'" errors when installing from GitHub:
-
Check the installation completed successfully:
cd node_modules/que-ts ls dist/ # Should show compiled JavaScript files
-
Manual build if needed:
cd node_modules/que-ts npm run build -
Verify installation:
cd node_modules/que-ts node test-install.js -
Alternative: Use specific tag:
npm install github:Duke-Engineering/que-ts#v1.0.0
# Start PostgreSQL with Docker
npm run docker:up
# Run tests
npm test
# Stop when done
npm run docker:downFor detailed Docker usage, see DOCKER.md.
Create the required database table:
CREATE TABLE que_jobs (
priority smallint NOT NULL DEFAULT 100,
run_at timestamptz NOT NULL DEFAULT now(),
job_id bigserial NOT NULL,
job_class text NOT NULL,
args json NOT NULL DEFAULT '[]'::json,
error_count integer NOT NULL DEFAULT 0,
last_error text,
queue text NOT NULL DEFAULT '',
PRIMARY KEY (queue, priority, run_at, job_id)
);import { Client, Worker } from 'que-ts';
// Create a client
const client = new Client({
host: 'localhost',
port: 5432,
database: 'myapp',
user: 'postgres',
password: 'password'
});
// Enqueue jobs
await client.enqueue('SendEmail', ['[email protected]', 'Welcome!']);
await client.enqueue('ProcessPayment', [{ amount: 100, currency: 'USD' }], {
priority: 10,
runAt: new Date(Date.now() + 60000) // Run in 1 minute
});
// Create and start a worker
const worker = new Worker({
host: 'localhost',
port: 5432,
database: 'myapp',
user: 'postgres',
password: 'password'
});
// Register job handlers
worker.register('SendEmail', async (job) => {
const [email, message] = job.args;
console.log(`Sending email to ${email}: ${message}`);
// Email sending logic here
});
worker.register('ProcessPayment', async (job) => {
const paymentData = job.args[0];
console.log(`Processing payment of ${paymentData.amount} ${paymentData.currency}`);
// Payment processing logic here
});
// Start processing jobs
await worker.work();
// Graceful shutdown
process.on('SIGINT', async () => {
await worker.shutdown();
await client.close();
});new Client(config?: ClientConfig)enqueue(jobClass: string, args?: any[], options?: EnqueueOptions): Promise<Job>enqueueInTx(client: PoolClient, jobClass: string, args?: any[], options?: EnqueueOptions): Promise<Job>lockJob(queue?: string): Promise<Job | null>close(): Promise<void>
new Worker(clientConfig?: ClientConfig, options?: WorkerOptions)register(jobClass: string, workFunc: WorkFunction): voidwork(): Promise<void>workOne(): Promise<boolean>shutdown(): Promise<void>
id: numberqueue: stringpriority: numberrunAt: DatejobClass: stringargs: any[]errorCount: numberlastError?: string
done(): Promise<void>delete(): Promise<void>error(errorMessage: string): Promise<void>
interface ClientConfig {
connectionString?: string;
host?: string;
port?: number;
database?: string;
user?: string;
password?: string;
ssl?: boolean;
maxConnections?: number;
}interface WorkerOptions {
queue?: string; // Queue name to process (default: '')
interval?: number; // Polling interval in ms (default: 5000)
maxAttempts?: number; // Max retry attempts (default: 5)
}interface EnqueueOptions {
priority?: number; // Job priority (lower = higher priority, default: 100)
runAt?: Date; // When to run the job (default: now)
queue?: string; // Queue name (default: '')
}Failed jobs are automatically retried with exponential backoff:
- 1st retry: after 1 second
- 2nd retry: after 16 seconds
- 3rd retry: after 81 seconds
- 4th retry: after 256 seconds
- etc.
Jobs that exceed the maximum number of retries remain in the queue for manual inspection.
Jobs can be organized into named queues and assigned priorities:
// High priority job in 'critical' queue
await client.enqueue('ProcessPayment', [paymentData], {
queue: 'critical',
priority: 1
});
// Low priority job in 'background' queue
await client.enqueue('SendNewsletter', [newsletterData], {
queue: 'background',
priority: 500
});
// Worker processing only critical queue
const criticalWorker = new Worker(config, { queue: 'critical' });que-ts is designed to be fully compatible with:
You can enqueue jobs in one language and process them in another, or run workers in multiple languages simultaneously.
- Que (Ruby) - The original and most mature implementation
- que-go - Go implementation (unmaintained, but stable)
- que-ts - This TypeScript/Node.js implementation
# Install dependencies
npm install
# Start PostgreSQL with Docker
npm run docker:up
# Run tests
npm test
# Run tests in watch mode
npm run test:watch
# Build
npm run build
# Lint
npm run lint
# Stop Docker containers
npm run docker:downnpm run docker:up- Start PostgreSQL and Adminernpm run docker:down- Stop containersnpm run docker:logs- View PostgreSQL logsnpm run docker:clean- Remove containers and volumesnpm run test:docker- Full test cycle with Docker
Access database admin at http://localhost:8080 (user: que_user, password: que_password)
See DOCKER.md for detailed Docker documentation.
If you prefer not to use Docker, ensure PostgreSQL is running and create the database schema manually using migrations/schema.sql.
MIT