-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathenqueuer.go
152 lines (129 loc) · 4.42 KB
/
enqueuer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package asyncer
import (
"context"
"encoding/json"
"errors"
"time"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
)
type (
// Enqueuer is a helper struct for enqueuing tasks.
// You can encapsulate this struct in your own struct to add queue methods.
// See pkg/worker/_example/enqueuer.go for an example.
Enqueuer struct {
client *asynq.Client
queueName string
taskDeadline time.Duration
maxRetry int
}
// EnqueuerOption is a function that configures an enqueuer.
EnqueuerOption func(*Enqueuer)
)
// NewEnqueuerWithAsynqClient creates a new Enqueuer with the given Asynq client and options.
// It returns a pointer to the Enqueuer and an error if the Asynq client is nil.
// The Enqueuer is responsible for enqueueing tasks to the Asynq server.
// Default values are used if no option is provided.
// Default values are:
// - queue name: "default"
// - task deadline: 1 minute
// - max retry: 3
func NewEnqueuerWithAsynqClient(client *asynq.Client, opt ...EnqueuerOption) (*Enqueuer, error) {
if client == nil {
return nil, ErrMissedAsynqClient
}
e := &Enqueuer{
client: client,
queueName: "default",
taskDeadline: time.Minute,
maxRetry: 3,
}
for _, o := range opt {
o(e)
}
return e, nil
}
// MustNewEnqueuerWithAsynqClient creates a new Enqueuer with the given Asynq client and options.
// It panics if an error occurs during the creation of the Enqueuer.
func MustNewEnqueuerWithAsynqClient(client *asynq.Client, opt ...EnqueuerOption) *Enqueuer {
e, err := NewEnqueuerWithAsynqClient(client, opt...)
if err != nil {
panic(err)
}
return e
}
// NewEnqueuer creates a new Enqueuer with the given Redis connection string and options.
// Default values are used if no option is provided.
// It returns a pointer to the Enqueuer and an error if there was a problem creating the Enqueuer.
func NewEnqueuer(redisClient redis.UniversalClient, opt ...EnqueuerOption) (*Enqueuer, error) {
client, err := NewClient(redisClient)
if err != nil {
return nil, errors.Join(ErrFailedToCreateEnqueuerWithClient, err)
}
return NewEnqueuerWithAsynqClient(client, opt...)
}
// MustNewEnqueuer creates a new Enqueuer with the given Redis connection string and options.
// It panics if an error occurs during the creation of the Enqueuer.
func MustNewEnqueuer(redisClient redis.UniversalClient, opt ...EnqueuerOption) *Enqueuer {
e, err := NewEnqueuer(redisClient, opt...)
if err != nil {
panic(err)
}
return e
}
// EnqueueTask enqueues a task to be processed asynchronously.
// It takes a context and a task as parameters.
// The task is enqueued with the specified queue name, deadline, maximum retry count, and uniqueness constraint.
// Returns an error if the task fails to enqueue.
func (e *Enqueuer) EnqueueTask(ctx context.Context, taskName string, payload any, opts ...TaskOption) error {
// Marshal payload to JSON bytes
jsonPayload, err := json.Marshal(payload)
if err != nil {
return errors.Join(ErrFailedToEnqueueTask, err)
}
// Set default options for enqueuing task.
// These options can be overridden by the user provided options.
defaultOptions := []asynq.Option{
asynq.Queue(e.queueName),
asynq.Deadline(time.Now().Add(e.taskDeadline)),
asynq.MaxRetry(e.maxRetry),
asynq.Unique(e.taskDeadline),
}
// Enqueue task
if _, err := e.client.Enqueue(
asynq.NewTask(taskName, jsonPayload),
append(defaultOptions, opts...)...,
); err != nil {
return errors.Join(ErrFailedToEnqueueTask, err)
}
return nil
}
// Close closes the Enqueuer and releases any resources associated with it.
// It returns an error if there was a problem closing the Enqueuer.
func (e *Enqueuer) Close() error {
if err := e.client.Close(); err != nil {
return errors.Join(ErrFailedToCloseEnqueuer, err)
}
return nil
}
// WithQueueNameEnq configures the queue name for enqueuing.
// The queue name is the name of the queue where the task will be enqueued.
func WithQueueNameEnq(name string) EnqueuerOption {
return func(e *Enqueuer) {
e.queueName = name
}
}
// WithTaskDeadline configures the task deadline.
// The task deadline is the time limit for the task to be processed.
func WithTaskDeadline(d time.Duration) EnqueuerOption {
return func(e *Enqueuer) {
e.taskDeadline = d
}
}
// WithMaxRetry configures the max retry.
// The max retry is the number of times the task will be retried if it fails.
func WithMaxRetry(n int) EnqueuerOption {
return func(e *Enqueuer) {
e.maxRetry = n
}
}