Running Workers
A Worker streams jobs from the server, dispatches each to a handler with
bounded concurrency, batches acknowledgements, and reconnects on transient
failures. Build one with Worker::builder():
#![allow(unused)]
fn main() {
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use zizq::{Client, JobKind, Router, Worker};
#[derive(Serialize, Deserialize)]
struct SendEmail { to: String }
impl JobKind for SendEmail { const NAME: &'static str = "send_email"; }
async fn run(client: Client) -> Result<(), Box<dyn std::error::Error>> {
let worker = Worker::builder()
.client(client)
.concurrency(25)
.queues(vec!["emails"])
.handler(Router::new().route(async |job: SendEmail| {
// your application logic here
println!("emailing {}", job.to);
Ok::<(), Infallible>(())
}))
.build()?;
worker.run(tokio::signal::ctrl_c()).await?; // runs until Ctrl-C
Ok(()) }
}
worker.run(shutdown) takes a shutdown future — when it resolves, the
worker shuts down gracefully (see below). Pass
tokio::signal::ctrl_c(), a CancellationToken, or
std::future::pending() to run until the process is killed.
Handlers
The handler passed to a Worker is anything that implements the
JobHandler trait. The simplest handler is an async closure taking a Job —
the raw job, with its id, job_type, and payload (as JSON):
#![allow(unused)]
fn main() {
use zizq::{Client, Job, Worker};
async fn run(client: Client) -> Result<(), Box<dyn std::error::Error>> {
let worker = Worker::builder()
.client(client)
.concurrency(25)
.queues(vec!["emails"])
.handler(|job: Job| async move {
println!("processing {} ({})", job.id, job.job_type);
// ... your application logic ...
Ok::<(), std::convert::Infallible>(())
})
.build()?;
let _ = worker;
Ok(()) }
}
A handler that returns Ok acknowledges the job as complete; returning Err
fails it, and the server applies the job’s retry/backoff policy.
A bare closure receives every job from the worker’s queues regardless of type,
with the payload left as untyped JSON. For typed, per-type dispatch — the
common case — use a Router.
Routing by job type
A Router is itself a JobHandler, so you pass it to .handler() exactly as
you would a closure. Rather than one function for every job, it dispatches
each job to a handler chosen by its type, deserializing the payload into the
right JobKind automatically. Register one handler per type with .route:
#![allow(unused)]
fn main() {
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use zizq::Router;
#[derive(Serialize, Deserialize)]
struct SendEmail { to: String }
impl zizq::JobKind for SendEmail { const NAME: &'static str = "send_email"; }
#[derive(Serialize, Deserialize)]
struct ProcessReport { id: u64 }
impl zizq::JobKind for ProcessReport { const NAME: &'static str = "process_report"; }
let router = Router::new()
.route(async |job: SendEmail| {
// ... send the email ...
Ok::<(), Infallible>(())
})
.route(async |job: ProcessReport| {
// ... process the report ...
Ok::<(), Infallible>(())
});
}
A job whose type matches no route is failed with a routing error.
Concurrency
concurrency sets how many job handlers may run at once. The default is 1
— a purely sequential worker.
#![allow(unused)]
fn main() {
use zizq::Worker;
fn x(b: zizq::WorkerBuilder) -> zizq::WorkerBuilder {
b.concurrency(100)
}
}
Prefetch
By default the worker fetches as many jobs as its concurrency. Throughput can improve by prefetching more — keeping a buffer of jobs ready so a handler never waits on the network for its next job.
#![allow(unused)]
fn main() {
use zizq::Worker;
fn x(b: zizq::WorkerBuilder) -> zizq::WorkerBuilder {
b.concurrency(50).prefetch(60) // 50 in flight, 10 buffered
}
}
Warning
Do not set
prefetchbelowconcurrency— that starves the worker, since there are never enough buffered jobs to reach the desired concurrency.
Which queues to process
queues selects which queues the worker pulls from. An empty list (the
default) means all queues. Queues need not be created — they exist
implicitly once jobs are enqueued to them.
#![allow(unused)]
fn main() {
use zizq::Worker;
fn x(b: zizq::WorkerBuilder) -> zizq::WorkerBuilder {
b.queues(vec!["emails", "invoicing"])
}
}
Graceful shutdown
When the shutdown future resolves, run() performs a patient shutdown: it
stops pulling new jobs, waits for in-flight handlers to finish, flushes any
pending acknowledgements, then closes the connection. run() returns once
that has drained. This is bounded by a shutdown timeout (30s by default).
Because acks are flushed before the connection closes, no completed work is lost. If the worker is killed before its acks land, the server simply re-dispatches those still-in-flight jobs to another worker — Zizq’s delivery is at-least-once, so handlers should be idempotent.
Manual control
The Worker is the recommended consumer API, but the underlying primitives
are available for full manual control:
Client::takeopens a streaming connection and yields jobs as afutures::Stream.Client::report_success/Client::report_success_bulkacknowledge jobs.Client::report_failurefails a job, with optional retry timing and error detail.
#![allow(unused)]
fn main() {
use futures_util::TryStreamExt;
use zizq::Client;
async fn run(client: &Client) -> Result<(), zizq::ZizqError> {
let mut stream = client.take().queues(["emails"]).prefetch(16).await?;
while let Some(job) = stream.try_next().await? {
// ... process the job ...
client.report_success(&job.id).await?;
}
Ok(()) }
}