Skip to content

Commit

Permalink
Passing context to listMessages
Browse files Browse the repository at this point in the history
  • Loading branch information
Joker666 committed Oct 5, 2024
1 parent 824c7c5 commit 2a95b95
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
11 changes: 6 additions & 5 deletions inspector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package asynq

import (
"context"
"fmt"
"strconv"
"strings"
Expand All @@ -18,7 +19,7 @@ type Inspector struct {
rdb *rdb.RDB
}

// New returns a new instance of Inspector.
// NewInspector returns a new instance of Inspector.
func NewInspector(r RedisConnOpt) *Inspector {
c, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
Expand Down Expand Up @@ -295,13 +296,13 @@ func Page(n int) ListOption {
// ListPendingTasks retrieves pending tasks from the specified queue.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
func (i *Inspector) ListPendingTasks(ctx context.Context, queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListPending(queue, pgn)
infos, err := i.rdb.ListPending(ctx, queue, pgn)
switch {
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
Expand All @@ -323,13 +324,13 @@ func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskI
// ListActiveTasks retrieves active tasks from the specified queue.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
func (i *Inspector) ListActiveTasks(ctx context.Context, queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListActive(queue, pgn)
infos, err := i.rdb.ListActive(ctx, queue, pgn)
switch {
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
Expand Down
15 changes: 8 additions & 7 deletions internal/rdb/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func (p Pagination) stop() int64 {
}

// ListPending returns pending tasks that are ready to be processed.
func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskInfo, error) {
func (r *RDB) ListPending(ctx context.Context, qname string, pgn Pagination) ([]*base.TaskInfo, error) {
var op errors.Op = "rdb.ListPending"
exists, err := r.queueExists(qname)
if err != nil {
Expand All @@ -621,15 +621,15 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskInfo, error
if !exists {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
res, err := r.listMessages(qname, base.TaskStatePending, pgn)
res, err := r.listMessages(ctx, qname, base.TaskStatePending, pgn)
if err != nil {
return nil, errors.E(op, errors.CanonicalCode(err), err)
}
return res, nil
}

// ListActive returns all tasks that are currently being processed for the given queue.
func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskInfo, error) {
func (r *RDB) ListActive(ctx context.Context, qname string, pgn Pagination) ([]*base.TaskInfo, error) {
var op errors.Op = "rdb.ListActive"
exists, err := r.queueExists(qname)
if err != nil {
Expand All @@ -638,7 +638,7 @@ func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskInfo, error)
if !exists {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
res, err := r.listMessages(qname, base.TaskStateActive, pgn)
res, err := r.listMessages(ctx, qname, base.TaskStateActive, pgn)
if err != nil {
return nil, errors.E(op, errors.CanonicalCode(err), err)
}
Expand All @@ -662,7 +662,9 @@ return data
`)

// listMessages returns a list of TaskInfo in Redis list with the given key.
func (r *RDB) listMessages(qname string, state base.TaskState, pgn Pagination) ([]*base.TaskInfo, error) {
func (r *RDB) listMessages(ctx context.Context, qname string, state base.TaskState, pgn Pagination) (
[]*base.TaskInfo, error,
) {
var key string
switch state {
case base.TaskStateActive:
Expand All @@ -676,8 +678,7 @@ func (r *RDB) listMessages(qname string, state base.TaskState, pgn Pagination) (
// correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1
start := -pgn.stop() - 1
res, err := listMessagesCmd.Run(context.Background(), r.client,
[]string{key}, start, stop, base.TaskKeyPrefix(qname)).Result()
res, err := listMessagesCmd.Run(ctx, r.client, []string{key}, start, stop, base.TaskKeyPrefix(qname)).Result()
if err != nil {
return nil, errors.E(errors.Unknown, err)
}
Expand Down

0 comments on commit 2a95b95

Please sign in to comment.