From 2a95b951c5405c06799f5700bac7a63b7aaa16a8 Mon Sep 17 00:00:00 2001 From: Rafi Date: Sat, 5 Oct 2024 12:03:18 -0300 Subject: [PATCH] Passing context to listMessages --- inspector.go | 11 ++++++----- internal/rdb/inspect.go | 15 ++++++++------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/inspector.go b/inspector.go index a98a2211..35bd162a 100644 --- a/inspector.go +++ b/inspector.go @@ -1,6 +1,7 @@ package asynq import ( + "context" "fmt" "strconv" "strings" @@ -18,7 +19,7 @@ type Inspector struct { rdb *rdb.RDB } -// New returns a new instance of Inspector. +// NewInspector returns a new instance of Inspector. func NewInspector(r RedisConnOpt) *Inspector { c, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { @@ -295,13 +296,13 @@ func Page(n int) ListOption { // ListPendingTasks retrieves pending tasks from the specified queue. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) { +func (i *Inspector) ListPendingTasks(ctx context.Context, queue string, opts ...ListOption) ([]*TaskInfo, error) { if err := base.ValidateQueueName(queue); err != nil { return nil, fmt.Errorf("asynq: %v", err) } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - infos, err := i.rdb.ListPending(queue, pgn) + infos, err := i.rdb.ListPending(ctx, queue, pgn) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -323,13 +324,13 @@ func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskI // ListActiveTasks retrieves active tasks from the specified queue. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) { +func (i *Inspector) ListActiveTasks(ctx context.Context, queue string, opts ...ListOption) ([]*TaskInfo, error) { if err := base.ValidateQueueName(queue); err != nil { return nil, fmt.Errorf("asynq: %v", err) } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - infos, err := i.rdb.ListActive(queue, pgn) + infos, err := i.rdb.ListActive(ctx, queue, pgn) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index a232ceb9..17bc3a82 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -612,7 +612,7 @@ func (p Pagination) stop() int64 { } // ListPending returns pending tasks that are ready to be processed. -func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskInfo, error) { +func (r *RDB) ListPending(ctx context.Context, qname string, pgn Pagination) ([]*base.TaskInfo, error) { var op errors.Op = "rdb.ListPending" exists, err := r.queueExists(qname) if err != nil { @@ -621,7 +621,7 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskInfo, error if !exists { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } - res, err := r.listMessages(qname, base.TaskStatePending, pgn) + res, err := r.listMessages(ctx, qname, base.TaskStatePending, pgn) if err != nil { return nil, errors.E(op, errors.CanonicalCode(err), err) } @@ -629,7 +629,7 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskInfo, error } // ListActive returns all tasks that are currently being processed for the given queue. -func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskInfo, error) { +func (r *RDB) ListActive(ctx context.Context, qname string, pgn Pagination) ([]*base.TaskInfo, error) { var op errors.Op = "rdb.ListActive" exists, err := r.queueExists(qname) if err != nil { @@ -638,7 +638,7 @@ func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskInfo, error) if !exists { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } - res, err := r.listMessages(qname, base.TaskStateActive, pgn) + res, err := r.listMessages(ctx, qname, base.TaskStateActive, pgn) if err != nil { return nil, errors.E(op, errors.CanonicalCode(err), err) } @@ -662,7 +662,9 @@ return data `) // listMessages returns a list of TaskInfo in Redis list with the given key. -func (r *RDB) listMessages(qname string, state base.TaskState, pgn Pagination) ([]*base.TaskInfo, error) { +func (r *RDB) listMessages(ctx context.Context, qname string, state base.TaskState, pgn Pagination) ( + []*base.TaskInfo, error, +) { var key string switch state { case base.TaskStateActive: @@ -676,8 +678,7 @@ func (r *RDB) listMessages(qname string, state base.TaskState, pgn Pagination) ( // correct range and reverse the list to get the tasks with pagination. stop := -pgn.start() - 1 start := -pgn.stop() - 1 - res, err := listMessagesCmd.Run(context.Background(), r.client, - []string{key}, start, stop, base.TaskKeyPrefix(qname)).Result() + res, err := listMessagesCmd.Run(ctx, r.client, []string{key}, start, stop, base.TaskKeyPrefix(qname)).Result() if err != nil { return nil, errors.E(errors.Unknown, err) }