-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgorqs.go
211 lines (195 loc) · 5.33 KB
/
gorqs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// Package gorqs (stands for `Go Runnable Queue Service`) provides routines to queue and execute runnable jobs and caches
// their execution result (error) for later consultation. The Queue processor can run into synchronous or asynchronous mode.
// Adding a job to the Queue service is always a non-blocking operation and returns a unique job id on success.
package gorqs
import (
"context"
"sync"
"time"
)
// New creates an instance of a workable Queue.
func New(flags Flag) *Queue {
q := &Queue{
jobsChan: make(chan jobber, 1),
records: sync.Map{},
}
if flags&AsyncMode != 0 {
q.mode = AsyncMode
} else {
q.mode = SyncMode
}
if flags&TrackJobs != 0 {
q.recorder = func(id int64, err error) {
q.records.Store(id, err)
}
q.fetcher = func(ctx context.Context, id int64) error {
return q.fetch(ctx, id)
}
} else {
q.recorder = func(id int64, err error) {}
q.fetcher = func(ctx context.Context, id int64) error { return ErrNotImplemented }
}
return q
}
// Start pulls job from the queue and runs them.
// It returns once the context is cancelled.
func (q *Queue) Start(ctx context.Context) error {
switch q.mode {
case SyncMode:
return q.sstart(ctx)
case AsyncMode:
return q.astart(ctx)
case TrackJobs:
return ErrInvalidMode
default:
return ErrUnknownMode
}
}
// sconsumer is a synchronous worker which process one job
// at a time. Then records the result of job processing.
func (q *Queue) sconsumer(ctx context.Context, iq *slist) {
for {
select {
case <-ctx.Done():
return
default:
if !q.running.Load() {
return
}
if iq.isEmpty() {
continue
}
if job := iq.pop(); job != nil {
q.recorder(job.getID(), ErrRunning)
q.recorder(job.getID(), job.Run())
}
}
}
}
// sstart starts a single worker named sconsumer and pushes each received job
// onto an internal synchronized singly linked list named iq. This ensure that
// one job is processed at a time and jobs are processed in order of reception.
func (q *Queue) sstart(ctx context.Context) error {
q.running.Store(true)
iq := list()
go q.sconsumer(ctx, iq)
for {
select {
case <-ctx.Done():
q.running.Store(false)
return ctx.Err()
case job := <-q.jobsChan:
iq.push(job)
default:
if !q.running.Load() {
return nil
}
}
}
}
// astart pulls jobs from the queue and runs them asynchronously.
// It returns once the Queue is stopped or the context is done.
// Each job returned error result is stored into the records map.
func (q *Queue) astart(ctx context.Context) error {
q.running.Store(true)
for {
select {
case j := <-q.jobsChan:
go func() {
q.recorder(j.getID(), ErrRunning)
q.recorder(j.getID(), j.Run())
}()
case <-ctx.Done():
q.running.Store(false)
return ctx.Err()
default:
if !q.running.Load() {
return nil
}
}
}
}
// Push is a non-blocking method that adds job to the queue for processing.
// It allows a maximum of 10ms to enqueue a job. On success, it returns the
// unique job id (int64) and nil as error. Then records into the cache an
// initial state of the job result as ErrPending.
// If the queue service is stopped, it returns ErrQueueClosed. If after 10ms
// the the job is not enqueued it returns ErrTimeout. In case the context
// is done or fail to enqueue the job, it ensures the job id is not cached.
func (q *Queue) Push(ctx context.Context, r Runner) (int64, error) {
if !q.running.Load() {
return -1, ErrQueueClosed
}
recorded := false
id := q.counter.Add(1)
if q.mode == SyncMode {
q.recorder(id, ErrPending)
recorded = true
}
timer := time.NewTimer(10 * time.Millisecond)
defer timer.Stop()
select {
case <-ctx.Done():
if recorded {
q.records.Delete(id)
}
return -1, ctx.Err()
case q.jobsChan <- &job{id, r}:
return id, nil
case <-timer.C:
if recorded {
q.records.Delete(id)
}
return -1, ErrTimeout
}
}
// Stop closes the queue so no more job can be added.
func (q *Queue) Stop(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
q.running.Store(false)
return nil
}
}
// Clear removes all recorded job results from the cache.
func (q *Queue) Clear() {
q.records.Range(func(key interface{}, value interface{}) bool {
q.records.Delete(key)
return true
})
}
// IsRunning returns the current status of the queue service.
func (q *Queue) IsRunning() bool {
return q.running.Load()
}
// Fetch provides the result `error` of a given Job Runner based on its ID.
// If the job id was found, it delete the record from the map. It returns
// ErrNotFound if the `id` does not exist or ErrPending if the job runner
// did not start yet. ErrRunning if picked but still being processed.
// ErrNotImplemented is returned if the tracking feature was not enabled.
func (q *Queue) Fetch(ctx context.Context, id int64) error {
return q.fetcher(ctx, id)
}
// fetch is the internal function invoked to retrieve a given job execution result
// based on its id if this feature was enabled during the Queue initialization.
// It deletes the record from the cache in case the job execution is completed.
func (q *Queue) fetch(_ context.Context, id int64) error {
v, found := q.records.Load(id)
if !found {
return ErrNotFound
}
if v == nil {
return nil
}
if v == ErrPending || v == ErrRunning {
return v.(error)
}
err, found := v.(error)
q.records.Delete(id)
if !found {
return ErrInvalid
}
return err
}