Querying & Managing Jobs
Zizq makes vivibility and control key design focuses. The server provides
a number of endpoints for querying and managing job data, which are all
packaged up in Zizq::Query in the Ruby client.
Queries are initiated with Zizq.query and built by chaining methods onto that
query. All queries are lazy (they don’t execute until enumerated), and each
builder method returns a new instance of the query, so intermediate queries
can be passed around without worrying about mutability.
Zizq::Query
Methods starting with by_ replace the current condition, while methods
starting with add_ append to the condition. These methods also all accept
arrays which form IN (...) style conditions.
These methods only accept strings and combine to narrow down the current
filter (using the and operator):
These methods wrap #by_type and #add_jq_filter to find jobs enqueued using
Zizq::Job or Active Job classes extending Zizq::ActiveJobConfig:
These methods have special meaning (see docs):
These methods enumerate the results of the query:
These methods apply a delete or update to the entire scope of the query.
These methods apply a delete or update to the first result of the query (if any):
Additionally, and importantly, Zizq::Query is also Enumerable so methods
like #count, #reverse_each, #take etc do exactly what you would expect.
#by_id, #add_id
Narrows the query down to a given id or set of ids.
Zizq.query.by_id("03fvmay0zcoskwdf2sm0u94aw").each do |job|
puts "#{job.id}: #{job.payload.inspect}"
end
# 03fvmay0zcoskwdf2sm0u94aw: {"greet"=>"World"}
Zizq.query
.by_id("03fvmay0zcoskwdf2sm0u94aw")
.add_id("03fvqg68ra0od9u1b8m0txgka").each do |job|
puts "#{job.id}: #{job.payload.inspect}"
end
# 03fvmay0zcoskwdf2sm0u94aw: {"greet"=>"World"}
# 03fvqg68ra0od9u1b8m0txgka: {"greet"=>"Moon"}
Zizq.query
.by_id("03fvmay0zcoskwdf2sm0u94aw")
.add_id("03fvqg68ra0od9u1b8m0txgka")
.count
# 2
#by_queue, #add_queue
Narrows the query down to a given queue or set of queues.
Zizq.query.by_queue("analytics").count
# 90
Zizq.query.by_queue(["analytics", "example"]).count
# 93
Zizq.query.by_queue(["analytics", "example"]).add_queue("comms").count
# 3631
#by_type, #add_type
Narrows the query down to a given type or set of types. By default these
are job class names but if you are using cross-language features the types can
be arbitrary.
Zizq.query.by_queue("default").by_type("ProcessVideoJob").count
# 401
Zizq.query
.by_queue("default")
.by_type("ProcessVideoJob")
.add_type("ClearNotesJob")
.count
# 491
#by_status, #add_status
Narrows the query down to a given status or set of statuses.
Valid statuses are:
scheduledreadyin_flightcompleteddead
Zizq.query.by_status(["scheduled", "ready"]).count
# 5003
Zizq.query.by_status("ready").count
# 4993
Zizq.query.by_queue("default").by_status("ready").add_status("scheduled").count
# 491
#by_jq_filter, #add_jq_filter
Narrows the query down by matching on the payload. Since payloads are
arbitrary and known only to your application, filtering is done by using jq
expressions.
Tip
For more details on the
jqquery language, read the language specification on the jaq website or on jq.
Zizq.query.by_type("hello_world").by_jq_filter('.greet == "Moon"').each do |job|
puts "#{job.id}: #{job.payload}"
end
# 03fvqg68ra0od9u1b8m0txgka: {"greet"=>"Moon"}
Zizq.query.by_type("hello_world").by_jq_filter('.greet | contains("o")').each do |job|
puts "#{job.id}: #{job.payload}"
end
# 03fvmay0zcoskwdf2sm0u94aw: {"greet"=>"World"}
# 03fvqg68ra0od9u1b8m0txgka: {"greet"=>"Moon"}
Zizq.query.by_type("TestJob").by_jq_filter('.args[0] <= 15').each do |job|
puts "#{job.id}: #{job.payload}"
end
# 03fvqm2ejnbjahvhayikrkltr: {"args"=>[1, 5000], "kwargs"=>{}}
# 03fvqm2ejnbjahvhayk0y3i9k: {"args"=>[2, 5000], "kwargs"=>{}}
# 03fvqm2ejnbjahvhaykq7d4fh: {"args"=>[3, 5000], "kwargs"=>{}}
# 03fvqm2ejnbjahvhaynwu2ije: {"args"=>[4, 5000], "kwargs"=>{}}
# 03fvqm2ejnbjahvhaypw8nkng: {"args"=>[5, 5000], "kwargs"=>{}}
# 03fvqm2ejnbjahvhayr7ytglw: {"args"=>[6, 5000], "kwargs"=>{}}
# 03fvqm2ejnbjahvhaysw8ggq1: {"args"=>[7, 5000], "kwargs"=>{}}
# 03fvqm2ejnbjahvhayvtolt8p: {"args"=>[8, 5000], "kwargs"=>{}}
# 03fvqm2ejnbjahvhayx9j2rmx: {"args"=>[9, 5000], "kwargs"=>{}}
# 03fvqm2ejnbjahvhayzkj79ca: {"args"=>[10, 5000], "kwargs"=>{}}
# 03fvqm2ejnbjahvhaz26z9dgz: {"args"=>[11, 5000], "kwargs"=>{}}
# 03fvqm2ejnbjahvhaz2yke1ga: {"args"=>[12, 5000], "kwargs"=>{}}
# 03fvqm2ejnbjahvhaz5308g6x: {"args"=>[13, 5000], "kwargs"=>{}}
# 03fvqm2ejnbjahvhaz6c9pt9z: {"args"=>[14, 5000], "kwargs"=>{}}
# 03fvqm2ejnbjahvhaz93heeqc: {"args"=>[15, 5000], "kwargs"=>{}}
#by_job_class_and_args, #by_job_class_and_args_subset
These methods are specific to Zizq::Job classes and Active Job classes that
extend Zizq::ActiveJobConfig. They match jobs on the queue by wrapping
#by_type and #add_jq_filter internally. You can match either using exact
argument matches, or on just a subset of the arguments (N positional arguments
and partial keyword argument match).
Zizq.query.by_job_class_and_args(TestJob, 223, 5000).each do |job|
puts "#{job.id}: #{job.payload}"
end
# 03fvqm2ejnbjahvhbao96krod: {"args"=>[223, 5000], "kwargs"=>{}}
Zizq.query.by_job_class_and_args(TestJob, 223).each do |job|
puts "#{job.id}: #{job.payload}"
end
# (no output)
Zizq.query.by_job_class_and_args_subset(TestJob, 223).each do |job|
puts "#{job.id}: #{job.payload}"
end
# 03fvqm2ejnbjahvhbao96krod: {"args"=>[223, 5000], "kwargs"=>{}}
#order
Changes the sort order in which results are returned. Use Symbols :asc or
:desc. The default is :asc.
Zizq.query.by_status("scheduled").each do |job|
puts "#{job.id}"
end
# 03fvmay0zcoskwdf2sm0u94aw
# 03fvqg4qxj39zecl43gnwwh04
# 03fvqg68ra0od9u1b8m0txgka
# 03fvqhdcqsftfxrzb6xc8acjy
# 03fvqhdcqsftfxrzb6zq57rqs
# 03fvqhdcqsftfxrzb71cxvk13
# 03fvqhdcqsftfxrzb74afpg6x
# 03fvqhdcqsftfxrzb75rabbrt
# 03fvqhdcqsftfxrzb78any8s1
# 03fvqhdcqsftfxrzb7a5g3hpm
Zizq.query.by_status("scheduled").order(:desc).each do |job|
puts "#{job.id}"
end
# 03fvqhdcqsftfxrzb7a5g3hpm
# 03fvqhdcqsftfxrzb78any8s1
# 03fvqhdcqsftfxrzb75rabbrt
# 03fvqhdcqsftfxrzb74afpg6x
# 03fvqhdcqsftfxrzb71cxvk13
# 03fvqhdcqsftfxrzb6zq57rqs
# 03fvqhdcqsftfxrzb6xc8acjy
# 03fvqg68ra0od9u1b8m0txgka
# 03fvqg4qxj39zecl43gnwwh04
# 03fvmay0zcoskwdf2sm0u94aw
#limit
Changes the maximum number of total results returned by the query.
Zizq.query.by_status("scheduled").order(:desc).limit(3).each do |job|
puts "#{job.id}"
end
# 03fvqhdcqsftfxrzb7a5g3hpm
# 03fvqhdcqsftfxrzb78any8s1
# 03fvqhdcqsftfxrzb75rabbrt
#in_pages_of
Changes the number of results fetched in a single page as the client enumerates jobs. When not specified, the server’s default page size applies.
This also affects how #update_all and #delete_all are applied. Without
specifying #in_pages_of, these operations apply to the entire result set in a
single transaction. When #in_pages_of is specified, the bulk delete or update
is done in a batched manner.
Zizq.query.by_status("scheduled").in_pages_of(2).each do |job|
puts "#{job.id}"
end
# 03fvmay0zcoskwdf2sm0u94aw
# 03fvqg4qxj39zecl43gnwwh04
# 03fvqg68ra0od9u1b8m0txgka
# 03fvqhdcqsftfxrzb6xc8acjy
# 03fvqhdcqsftfxrzb6zq57rqs
# 03fvqhdcqsftfxrzb71cxvk13
# 03fvqhdcqsftfxrzb74afpg6x
# 03fvqhdcqsftfxrzb75rabbrt
# 03fvqhdcqsftfxrzb78any8s1
# 03fvqhdcqsftfxrzb7a5g3hpm
Zizq.query
.by_status("scheduled")
.in_pages_of(2)
.each_page
.with_index do |page, idx|
puts "Page #{idx+1}"
page.jobs.each { |job| puts "#{job.id}" }
end
# Page 1
# 03fvmay0zcoskwdf2sm0u94aw
# 03fvqg4qxj39zecl43gnwwh04
# Page 2
# 03fvqg68ra0od9u1b8m0txgka
# 03fvqhdcqsftfxrzb6xc8acjy
# Page 3
# 03fvqhdcqsftfxrzb6zq57rqs
# 03fvqhdcqsftfxrzb71cxvk13
# Page 4
# 03fvqhdcqsftfxrzb74afpg6x
# 03fvqhdcqsftfxrzb75rabbrt
# Page 5
# 03fvqhdcqsftfxrzb78any8s1
# 03fvqhdcqsftfxrzb7a5g3hpm
#each
Enumerates each Zizq::Resources::Job in the query result. Until this method,
or some other Enumerable method is called, the query is not yet executed.
Zizq.query.by_status("scheduled").each do |job|
puts "#{job.id}"
end
# 03fvmay0zcoskwdf2sm0u94aw
# 03fvqg4qxj39zecl43gnwwh04
# 03fvqg68ra0od9u1b8m0txgka
# 03fvqhdcqsftfxrzb6xc8acjy
# 03fvqhdcqsftfxrzb6zq57rqs
# 03fvqhdcqsftfxrzb71cxvk13
# 03fvqhdcqsftfxrzb74afpg6x
# 03fvqhdcqsftfxrzb75rabbrt
# 03fvqhdcqsftfxrzb78any8s1
# 03fvqhdcqsftfxrzb7a5g3hpm
#each_page
Enumerates each Zizq::Resources::JobPage in the query result.
Note
When combined with
#limit,#each_pagestops at the page boundary but the entire last page is returned even if the jobs it contains exceeds the limit.
Zizq.query
.by_status("scheduled")
.in_pages_of(2)
.each_page
.with_index do |page, idx|
puts "Page #{idx+1}"
page.jobs.each { |job| puts "#{job.id}" }
end
# Page 1
# 03fvmay0zcoskwdf2sm0u94aw
# 03fvqg4qxj39zecl43gnwwh04
# Page 2
# 03fvqg68ra0od9u1b8m0txgka
# 03fvqhdcqsftfxrzb6xc8acjy
# Page 3
# 03fvqhdcqsftfxrzb6zq57rqs
# 03fvqhdcqsftfxrzb71cxvk13
# Page 4
# 03fvqhdcqsftfxrzb74afpg6x
# 03fvqhdcqsftfxrzb75rabbrt
# Page 5
# 03fvqhdcqsftfxrzb78any8s1
# 03fvqhdcqsftfxrzb7a5g3hpm
#delete_all
Deletes all jobs from the server that match the given query and returns the number of deleted jobs. When the query is unfiltered, deletes all jobs from the server. This can be useful in tests.
When combined with #in_pages_of, jobs are deleted in a batch-wise manner,
otherwise all jobs are deleted in a single transaction.
This method also combines safely with #limit in order to explicitly prevent
deleting more than a specified number of jobs (implies batch-wise deletion).
Zizq.query.by_queue("analytics").count
# 90
Zizq.query.by_queue("analytics").delete_all
# 90
Zizq.query.by_queue("analytics").count
# 0
Zizq.query.by_queue("comms").count
# 3538
Zizq.query.by_queue("comms").in_pages_of(20).limit(30).order(:desc).delete_all
# 30
Zizq.query.by_queue("comms").count
# 3528
#update_all
Updates all jobs from the server that match the given query and returns the number of updated jobs. When the query is unfiltered, updates all jobs on the server.
The following job fields are mutable:
queuepriorityready_atretry_limitbackoffretention
Only the fields specified in the argument list are updated. All other fields are left unchanged.
This method can be used for example to re-assign/rename queues or to change
priority.
Setting an optional field to nil tells the server to restore that field back
to its default value, so for example to un-schedule a job and make it
immediately ready you can set ready_at to nil and the server will make it
ready immediately.
When combined with #in_pages_of, jobs are updated in a batch-wise manner,
otherwise all jobs are updated in a single transaction.
This method also combines safely with #limit in order to explicitly prevent
updating more than a specified number of jobs (implies batch-wise update).
Zizq.query.by_queue("default").count
# 15491
Zizq.query.by_queue("analytics").count
# 90
Zizq.query.by_queue("analytics").update_all(queue: "default")
# 90
Zizq.query.by_queue("default").count
# 15581
Zizq.query.by_queue("analytics").count
# 0
Zizq.query.by_queue("payments").by_status("scheduled").count
# 3
Zizq.query.by_queue("payments").by_status("scheduled").update_all(ready_at: nil)
# 3
Zizq.query.by_queue("payments").by_status("scheduled").count
# 0
#delete_one
Deletes at most the first matching result from the query.
Zizq.query.by_queue("payments").count
# 881
Zizq.query.by_queue("payments").order(:desc).delete_one
# 1
Zizq.query.by_queue("payments").count
# 880
#update_one
Updates at most the first matching result from the query.
Zizq.query.by_queue("comms").by_status("scheduled").count
# 2
Zizq.query.by_queue("comms").by_status("scheduled").update_one(ready_at: nil)
# 1
Zizq.query.by_queue("comms").by_status("scheduled").count
# 1
Zizq::Resources::Job
Each job in the result of Zizq::Query#each also implements methods to inspect
errors and manage the job.
The following method provides access to the job’s errors:
The following methods allow deleting or updating the job’s properties:
#errors
Enumerates the errors on this job, either in reverse or ascending order. This can also be done in pages.
job = Zizq.query.by_id("03fvqhdcqsftfxrzb7m9owqsf").first
job.errors.in_pages_of(20).order(:desc).each do |err|
puts "Attempt: #{err.attempt}, Message: #{err.message}"
end
# Attempt: 2, Message: Something went wrong
# Attempt: 1, Message: Something went wrong
#delete
Permanently deletes this job from the server. Returns nil on success. Raises
on failure (e.g. 404 - job not found).
job = Zizq.query.by_queue("comms").first
job.id
# 03fvqhdcqsftfxrzb7hekw4if
Zizq.query.by_id("03fvqhdcqsftfxrzb7hekw4if").count
# 1
job.delete
# nil
Zizq.query.by_id("03fvqhdcqsftfxrzb7hekw4if").count
# 0
job.delete
#! job not found (Zizq::NotFoundError)
#update
Updates mutable properties on this job. Returns the updated job metadata on
success. Raises Zizq::ClientError on failure.
Jobs in the completed or dead statuses are immutable and cannot be updated.
job = Zizq.query.by_queue("comms").first
job.id
# 03fvqhdcqsftfxrzb7nn5h57r
Zizq.query.by_id("03fvqhdcqsftfxrzb7nn5h57r").map(&:queue)
# ["comms"]
job.update(queue: "default")
# #<Zizq::Resources::Job @data={"id"=>"03fvqhdcqsftfxrzb7nn5h57r", ...}>
job.queue
# default
Zizq.query.by_id("03fvqhdcqsftfxrzb7nn5h57r").map(&:queue)
# ["default"]
Zizq::Resources::JobPage
The pages of jobs yielded by Zizq::Query#each_page also provide some helper
methods that operate across the whole page.
The page itself is also an Enumerable so #each, #count, #take etc work
as expected.
#delete_all
Delete all jobs on the current page by their IDs.
Warning
This method behaves differently to
Zizq::Query#delete_allwhich respects the original query filters. This method deletes every record on the page unconditionally, even if it may have since been updated and does not match the filters.
The example here deletes all jobs on the first 5 pages where the queue is
example.
Zizq.query.by_queue("example").each_page.take(5).each(&:delete_all)
#update_all
Update all jobs on the current page by their IDs.
Warning
This method behaves differently to
Zizq::Query#update_allwhich respects the original query filters. This method updates every record on the page unconditionally, even if it may have since been updated and does not match the filters.
The example here update all jobs on the first 2 pages where the queue is
example.
Zizq.query.by_queue("example").each_page.take(5).each do |page|
page.update_all(queue: "other")
end