Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Running Workers

Note

See Handler Functions for details on the handler provided to the Worker.

The Worker class streams jobs from the server, dispatches them to a handler function, and reports sucess of failure (ack/nack) to the server. A single Worker runs a specified number of concurrent handlers on a single Node event loop.

import { Client, Worker } from "@zizq-labs/zizq";

const client = new Client({ url: "http://localhost:7890" });

const worker = new Worker({
  client,
  concurrency: 25,
  queues: ["emails"],
  handler: async (job) => {
    // your application logic here
  },
});

process.on("SIGINT",  () => worker.stop());
process.on("SIGTERM", () => worker.stop());

await worker.run();  // blocks until stopped

Specifying the number of concurrent jobs

Pass concurrency to the Worker to specify how many jobs are permitted to run concurrently. The default value is 1, which means that worker is purely sequential.

const worker = new Worker({
  client,
  concurrency: 100,
  handler,
});

Specifying which queues to process

Pass an array of queues to specify one or more queues to listen for work on. Queues do not need to be explicitly created on the server. They implicitly come into existence when jobs are enqueued to them.

An empty array means all queues. The default value when not specified is [] (all queues).

const worker = new Worker({
  client,
  queues: ["emails", "invoicing"],
  handler,
});

Specifying a prefetch limit

By default the worker will fetch the same number of jobs as the total concurrency. Throughput can be increased significantly by prefetching more jobs than the total concurrency, ensuring there is always work sitting in the buffer, with more jobs received while the worker is busy processing the current jobs.

This default can be changed by specifying a prefetch limit. Just make sure not to set this value lower than concurrency as doing so will act as a concurrency limiter (there will not be enough jobs available to meet the desired concurrency).

const worker = new Worker({
  client,
  concurrency: 50,
  prefetch: 60, // 50 jobs being worked, 10 ready to go
  handler,
});

Specifying a logger instance

The Worker’s default logger is simply the console instance. Any other compatible logger can be specified by providing the logger to the worker.

Graceful shutdown

worker.stop() implements a patient graceful shutdown: it waits for all in-flight job handlers to resolve or reject, flushes any pending acks, then closes the take-jobs stream. worker.run() returns once all of that has drained.

If you want a deadline on graceful shutdown, escalate to worker.kill() after a timeout:

process.on("SIGINT", () => {
  worker.stop();
  setTimeout(() => worker.kill(), 30_000);
});

kill() closes the stream immediately and skips the drain + flush. Any in-flight job handlers that are already running continue to completion — Node can’t cancel promises — but their acks will not be flushed, so the server will re-dispatch the corresponding jobs to another worker upon detecting that the worker has disconnected. No jobs are lost in the process.