Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Querying & Managing Jobs

Zizq makes vivibility and control key design focuses. The server provides a number of endpoints for querying and managing job data, which are all packaged up in the client.jobs() query builder in the Node client.

Queries are initiated with client.jobs() and built by chaining methods onto that query. All queries are lazy (they don’t execute until async iterated), and each builder method returns a new instance of the query, so intermediate queries can be passed around without worrying about mutability.

Client.jobs()

Methods starting with by replace the current condition, while methods starting with add append to the condition. These methods also all accept arrays which form IN (...) style conditions.

These methods only accept strings and combine to narrow down the current filter (using the and operator):

These methods wrap addJqFilter() to find jobs enqueued with the specified payloads, or payload subsets.

These methods have special meaning (see docs):

These methods enumerate the results of the query:

These methods apply a delete or update to the entire scope of the query.

These methods apply a delete or update to the first result of the query (if any):

Additionally, and importantly, client.jobs() is also AsyncIterable so its results can be iterated and wrapped with helpers like iter-tools. Some basic helpers are also included and chainable directly onto the query (toArray(), map(), filter(), forEach, count(), first(), isEmpty()).

byId(), addId

Narrows the query down to a given id or set of ids.

for await (job of client.jobs().byId("03fvmay0zcoskwdf2sm0u94aw")) {
  console.log(`${job.id}: ${JSON.stringify(job.payload)}`);
}
// 03fvmay0zcoskwdf2sm0u94aw: {"greet":"World"}

for await (const job of client.jobs()
    .byId("03fvmay0zcoskwdf2sm0u94aw")
    .addId("03fvqg68ra0od9u1b8m0txgka")) {
  console.log(`${job.id}: ${JSON.stringify(job.payload)}`);
}
// 03fvmay0zcoskwdf2sm0u94aw: {"greet":"World"}
// 03fvqg68ra0od9u1b8m0txgka: {"greet":"Moon"}

await client
  .jobs()
  .byId("03fvmay0zcoskwdf2sm0u94aw")
  .addId("03fvqg68ra0od9u1b8m0txgka")
  .count(); // 2

byQueue(), addQueue()

Narrows the query down to a given queue or set of queues.

await client.jobs().byQueue("analytics").count();
// 90

await client.jobs().byQueue(["analytics", "example"]).count();
// 93

await client.jobs().byQueue(["analytics", "example"]).addQueue("comms").count();
// 3631

byType(), addType()

Narrows the query down to a given type or set of types.

await client.jobs().byQueue("default").byType("process_video").count();
// 401

await client.jobs()
  .byQueue("default")
  .byType("process_video")
  .addType("clear_notes")
  .count();
// 491

byStatus(), addStatus()

Narrows the query down to a given status or set of statuses.

Valid statuses are:

  • scheduled
  • ready
  • in_flight
  • completed
  • dead
await client.jobs().byStatus(["scheduled", "ready"]).count();
// 5003

await client.jobs().byStatus("ready").count();
// 4993

await client.jobs().byQueue("default").byStatus("ready").addStatus("scheduled").count();
// 491

byJqFilter(), addJqFilter()

Narrows the query down by matching on the payload. Since payloads are arbitrary and known only to your application, filtering is done by using jq expressions.

Tip

For more details on the jq query language, read the language specification on the jaq website or on jq.

for await (const job of client.jobs()
    .byType("hello_world")
    .byJqFilter('.greet == "Moon"')) {
  console.log(`${job.id}: ${JSON.stringify(job.payload)}`);
}
// 03fvqg68ra0od9u1b8m0txgka: {"greet":"Moon"}

for await (const job of client.jobs()
    .byType("hello_world")
    .byJqFilter('.greet | contains("o")')) {
  console.log(`${job.id}: ${JSON.stringify(job.payload)}`);
}
// 03fvmay0zcoskwdf2sm0u94aw: {"greet":"World"}
// 03fvqg68ra0od9u1b8m0txgka: {"greet":"Moon"}

for await (const job of client.jobs()
    .byType("process_video")
    .byJqFilter('.dimensions.width <= 600')) {
  console.log(`${job.id}: ${JSON.stringify(job.payload)}`);
}
// 03fvqm2ejnbjahvhayikrkltr: {"dimensions":{"width":600,"height":400}}
// 03fvqm2ejnbjahvhayk0y3i9k: {"dimensions":{"width":599,"height":399}}
// 03fvqm2ejnbjahvhaykq7d4fh: {"dimensions":{"width":598,"height":398}}
// 03fvqm2ejnbjahvhaynwu2ije: {"dimensions":{"width":597,"height":397}}
// 03fvqm2ejnbjahvhaypw8nkng: {"dimensions":{"width":596,"height":396}}
// 03fvqm2ejnbjahvhayr7ytglw: {"dimensions":{"width":595,"height":395}}
// 03fvqm2ejnbjahvhaysw8ggq1: {"dimensions":{"width":594,"height":394}}
// 03fvqm2ejnbjahvhayvtolt8p: {"dimensions":{"width":593,"height":393}}
// 03fvqm2ejnbjahvhayx9j2rmx: {"dimensions":{"width":592,"height":392}}

withPayload(), withPayloadSubset()

These methods match jobs on the queue by wrapping addJqFilter internally. You can match either using exact payloads, or on just a subset of the payload (N positional array elements for arrays, partial object match for objects).

for await (const job of client.jobs()
    .withPayload({dimensions: {width: 223, height: 180}})) {
  console.log(`${job.id}: ${JSON.stringify(job.payload)}`);
}
// 03fvqm2ejnbjahvhbao96krod: {"dimensions":{"width":223,"height":180}}

for await (const job of client.jobs()
    .withPayload({dimensions: {width: 2, height: 1}})) {
  console.log(`${job.id}: ${JSON.stringify(job.payload)}`);
}
// (no output)

for await (const job of client.jobs()
    .withPayloadSubset({dimensions: {width: 223}})) {
  console.log(`${job.id}: ${JSON.stringify(job.payload)}`);
}
// 03fvqm2ejnbjahvhbao96krod: {"dimensions":{"width":223,"height":180}}

order()

Changes the sort order in which results are returned. Use either "asc" or "desc". The default is "asc".

for await (const job of client.jobs().byStatus("scheduled")) {
  console.log(job.id);
}
// 03fvmay0zcoskwdf2sm0u94aw
// 03fvqg4qxj39zecl43gnwwh04
// 03fvqg68ra0od9u1b8m0txgka
// 03fvqhdcqsftfxrzb6xc8acjy
// 03fvqhdcqsftfxrzb6zq57rqs
// 03fvqhdcqsftfxrzb71cxvk13
// 03fvqhdcqsftfxrzb74afpg6x
// 03fvqhdcqsftfxrzb75rabbrt
// 03fvqhdcqsftfxrzb78any8s1
// 03fvqhdcqsftfxrzb7a5g3hpm

for await (const job of client.jobs().byStatus("scheduled").order("desc")) {
  console.log(job.id);
}
// 03fvqhdcqsftfxrzb7a5g3hpm
// 03fvqhdcqsftfxrzb78any8s1
// 03fvqhdcqsftfxrzb75rabbrt
// 03fvqhdcqsftfxrzb74afpg6x
// 03fvqhdcqsftfxrzb71cxvk13
// 03fvqhdcqsftfxrzb6zq57rqs
// 03fvqhdcqsftfxrzb6xc8acjy
// 03fvqg68ra0od9u1b8m0txgka
// 03fvqg4qxj39zecl43gnwwh04
// 03fvmay0zcoskwdf2sm0u94aw

limit()

Changes the maximum number of total results returned by the query.

for await (const job of client.jobs()
    .byStatus("scheduled")
    .order("desc")
    .limit(3)) {
  console.log(job.id);
}
// 03fvqhdcqsftfxrzb7a5g3hpm
// 03fvqhdcqsftfxrzb78any8s1
// 03fvqhdcqsftfxrzb75rabbrt

inPagesOf()

Changes the number of results fetched in a single page as the client iterates jobs. When not specified, the server’s default page size applies.

This also affects how updateAll() and deleteAll() are applied. Without specifying inPagesOf(), these operations apply to the entire result set in a single transaction. When inPagesOf() is specified, the bulk delete or update is done in a batched manner.

for await (const job of client.jobs().byStatus("scheduled").inPagesOf(2)) {
  console.log(job.id);
}
// 03fvmay0zcoskwdf2sm0u94aw
// 03fvqg4qxj39zecl43gnwwh04
// 03fvqg68ra0od9u1b8m0txgka
// 03fvqhdcqsftfxrzb6xc8acjy
// 03fvqhdcqsftfxrzb6zq57rqs
// 03fvqhdcqsftfxrzb71cxvk13
// 03fvqhdcqsftfxrzb74afpg6x
// 03fvqhdcqsftfxrzb75rabbrt
// 03fvqhdcqsftfxrzb78any8s1
// 03fvqhdcqsftfxrzb7a5g3hpm

let pageNum = 1;
for await (const page of client.jobs()
    .byStatus("scheduled")
    .inPagesOf(2)
    .pages()) {
  console.log(`Page ${pageNum}`);
  for (const job of page.jobs) {
    console.log(job.id);
  }
  pageNum++;
}
// Page 1
// 03fvmay0zcoskwdf2sm0u94aw
// 03fvqg4qxj39zecl43gnwwh04
// Page 2
// 03fvqg68ra0od9u1b8m0txgka
// 03fvqhdcqsftfxrzb6xc8acjy
// Page 3
// 03fvqhdcqsftfxrzb6zq57rqs
// 03fvqhdcqsftfxrzb71cxvk13
// Page 4
// 03fvqhdcqsftfxrzb74afpg6x
// 03fvqhdcqsftfxrzb75rabbrt
// Page 5
// 03fvqhdcqsftfxrzb78any8s1
// 03fvqhdcqsftfxrzb7a5g3hpm

*[Symbol.asyncIterator]()

Note

This method is automatically called by the JavaScript runtime whenever for await is used on the query. It is not called explicitly.

Iterates over each Job in the query result. Until this method, is called, the query is not yet executed.

for await (const job of client.joibs().byStatus("scheduled")) {
  console.log(job.id);
}
// 03fvmay0zcoskwdf2sm0u94aw
// 03fvqg4qxj39zecl43gnwwh04
// 03fvqg68ra0od9u1b8m0txgka
// 03fvqhdcqsftfxrzb6xc8acjy
// 03fvqhdcqsftfxrzb6zq57rqs
// 03fvqhdcqsftfxrzb71cxvk13
// 03fvqhdcqsftfxrzb74afpg6x
// 03fvqhdcqsftfxrzb75rabbrt
// 03fvqhdcqsftfxrzb78any8s1
// 03fvqhdcqsftfxrzb7a5g3hpm

*pages()

Iterates over each JobPage in the query result.

Note

When combined with limit(), *pages() stops at the page boundary but the entire last page is returned even if the jobs it contains exceeds the limit.

let pageNum = 1;
for await (const page of client.jobs()
    .byStatus("scheduled")
    .inPagesOf(2)
    .pages()) {
  console.log(`Page ${pageNum}`);
  for (const job of page.jobs) {
    console.log(job.id);
  }
  pageNum++;
}
// Page 1
// 03fvmay0zcoskwdf2sm0u94aw
// 03fvqg4qxj39zecl43gnwwh04
// Page 2
// 03fvqg68ra0od9u1b8m0txgka
// 03fvqhdcqsftfxrzb6xc8acjy
// Page 3
// 03fvqhdcqsftfxrzb6zq57rqs
// 03fvqhdcqsftfxrzb71cxvk13
// Page 4
// 03fvqhdcqsftfxrzb74afpg6x
// 03fvqhdcqsftfxrzb75rabbrt
// Page 5
// 03fvqhdcqsftfxrzb78any8s1
// 03fvqhdcqsftfxrzb7a5g3hpm

deleteAll()

Deletes all jobs from the server that match the given query and returns the number of deleted jobs. When the query is unfiltered, deletes all jobs from the server. This can be useful in tests.

When combined with inPagesOf, jobs are deleted in a batch-wise manner, otherwise all jobs are deleted in a single transaction.

This method also combines safely with limit() in order to explicitly prevent deleting more than a specified number of jobs (implies batch-wise deletion).

await client.jobs().byQueue("analytics").count();
// 90

await client.jobs().byQueue("analytics").deleteAll();
// 90

await client.jobs().byQueue("analytics").count();
// 0

await client.joibs().byQueue("comms").count();
// 3538

await client.jobs().byQueue("comms").inPagesOf(20).limit(30).order("desc").deleteAll();
// 30

await client.jobs().byQueue("comms").count();
// 3528

updateAll()

Updates all jobs from the server that match the given query and returns the number of updated jobs. When the query is unfiltered, updates all jobs on the server.

The following job fields are mutable:

  • queue
  • priority
  • readyAt
  • retryLimit
  • backoff
  • retention

Only the fields specified in the input are updated. Fields that are not specified, or are undefined, are left unchanged.

This method can be used for example to re-assign/rename queues or to change priority.

Setting an optional field to null tells the server to restore that field back to its default value, so for example to un-schedule a job and make it immediately ready you can set readyAt to null and the server will make it ready immediately.

When combined with inPagesOf(), jobs are updated in a batch-wise manner, otherwise all jobs are updated in a single transaction.

This method also combines safely with limit() in order to explicitly prevent updating more than a specified number of jobs (implies batch-wise update).

await client.jobs().byQueue("default").count();
// 15491

await client.jobs().byQueue("analytics").count();
// 90

await client.jobs().byQueue("analytics").updateAll({queue: "default"});
// 90

await client.jobs().byQueue("default").count();
// 15581

await client.jobs().byQueue("analytics").count();
// 0

await client.jobs().byQueue("payments").byStatus("scheduled").count();
// 3

await client.jobs().byQueue("payments").byStatus("scheduled").updateAll({readyAt: null});
// 3

await client.jobs().byQueue("payments").byStatus("scheduled").count();
// 0

deleteOne()

Deletes at most the first matching result from the query.

await client.jobs().byQueue("payments").count();
// 881

await client.jobs().byQueue("payments").order("desc").deleteOne();
// 1

await client.jobs().byQueue("payments").count();
// 880

updateOne()

Updates at most the first matching result from the query.

await client.jobs().byQueue("comms").byStatus("scheduled").count();
// 2

await client.jobs().byQueue("comms").byStatus("scheduled").updateOne({readyAt: null});
// 1

await client.jobs().byQueue("comms").byStatus("scheduled").count();
// 1

Job Resource Helpers

Each job in the result of client.jobs() also implements methods to inspect errors and manage the job.

The following method provides access to the job’s errors:

The following methods allow deleting or updating the job’s properties:

*errors()

Iterates over the errors on this job, either in reverse or ascending order. This can also be done in pages.

const job = await client.jobs().byId("03fvqhdcqsftfxrzb7m9owqsf").first();
for await (const err of job.errors().inPagesOf(20).order("desc")) {
  console.log(`Attempt: ${err.attempt}, Message: ${err.message}`);
}
// Attempt: 2, Message: Something went wrong
// Attempt: 1, Message: Something went wrong

delete()

Permanently deletes this job from the server. Returns undefined on success. Rejects on failure (e.g. 404 - job not found).

const job = await client.jobs().byQueue("comms").first();

job.id
// 03fvqhdcqsftfxrzb7hekw4if

await client.jobs().byId("03fvqhdcqsftfxrzb7hekw4if").count();
// 1

await job.delete();
// undefined

await client.jobs().byId("03fvqhdcqsftfxrzb7hekw4if").count();
// 0

await job.delete();
//! job not found (NotFoundError)

update()

Updates mutable properties on this job. Returns the updated job metadata on success. Rejects with a ClientError on failure.

Jobs in the completed or dead statuses are immutable and cannot be updated.

const job = await client.jobs().byQueue("comms").first();

job.id
// 03fvqhdcqsftfxrzb7nn5h57r

for await (const job of client.jobs().byId("03fvqhdcqsftfxrzb7nn5h57r")) {
  console.log(job.queue);
}
// comms

await job.update({queue: "default"});

for await (const job of client.jobs().byId("03fvqhdcqsftfxrzb7nn5h57r")) {
  console.log(job.queue);
}
// default

JobPage Resource Helpers

The pages of jobs yielded by client.jobs().pages() also provide some helper methods that operate across the whole page.

The page itself is also an Iterable so it can be iterated over using for .. of.

deleteAll()

Delete all jobs on the current page by their IDs.

Warning

This method behaves differently to client.jobs().deleteAll() which respects the original query filters. This method deletes every record on the page unconditionally, even if it may have since been updated and does not match the filters.

The example here deletes all jobs on the first 5 pages where the queue is example.

let pageNum = 1;
for await (const page of client.jobs().byQueue("example").pages()) {
  if (pageNum > 5) break;
  await page.deleteAll();
  pageNum++;
}

updateAll()

Update all jobs on the current page by their IDs.

Warning

This method behaves differently to client.jobs().updateAll() which respects the original query filters. This method updates every record on the page unconditionally, even if it may have since been updated and does not match the filters.

The example here update all jobs on the first 2 pages where the queue is example.

let pageNum = 1;
for await (const page of client.jobs().byQueue("example").pages()) {
  if (pageNum > 5) break;
  await page.updateAll({queue: "other"});
  pageNum++;
}