-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpgq_subscribing_to_queue.go
70 lines (63 loc) · 1.79 KB
/
pgq_subscribing_to_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// TODO Implememt full API
package pgq
// Subscribe consumer on a queue.
// From this moment forward, consumer will see all events in the queue.
// Parameters
// queue_name Name of queue
// consumer_name Name of consumer
// Returns
// 0 if already registered
// 1 if new registration
// Calls:
// pgq.register_consumer_at(3)
// Tables directly manipulated:
// None
func (h *PGQHandle) RegisterConsumer(queue_name, consumer_name string) (out int, err error) {
err = h.q.QueryRow(
"SELECT pgq.register_consumer($1, $2)",
queue_name,
consumer_name,
).Scan(&out)
return
}
// Extended registration, allows to specify tick_id.
// Note
// For usage in special situations.
// Parameters
// queue_name Name of a queue
// consumer_name Name of consumer
// tick_pos Tick ID
// Returns
// 0/1 whether consumer has already registered.
// Calls:
// None
// Tables directly manipulated:
// update/insert - pgq.subscription
func (h *PGQHandle) RegisterConsumerAt(queue_name, consumer_name string, tick_pos int) (out int, err error) {
err = h.q.QueryRow(
"SELECT pgq.register_consumer_at($1, $2, $3)",
queue_name,
consumer_name,
tick_pos,
).Scan(&out)
return
}
// Unsubscribe consumer from the queue. Also consumer’s retry events are deleted.
// Parameters
// queue_name Name of the queue
// consumer_name Name of the consumer
// Returns
// number of (sub)consumers unregistered
// Calls:
// None
// Tables directly manipulated:
// delete - pgq.retry_queue
// delete - pgq.subscription
func (h *PGQHandle) UnregisterConsumer(queue_name, consumer_name string) (out int, err error) {
err = h.q.QueryRow(
"SELECT pgq.unregister_consumer($1, $2)",
queue_name,
consumer_name,
).Scan(&out)
return
}