Skip to content

Commit

Permalink
Merge pull request #143 from jaroslaw-bochniak/queue-synchronization-…
Browse files Browse the repository at this point in the history
…methods

Implemented methods for SyncQueue and CancelSyncQueue
  • Loading branch information
michaelklishin authored Sep 23, 2019
2 parents ef44349 + ca0b503 commit cd63c64
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 5 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,14 @@ resp, err := rmqc.DeleteQueue("/", "a.queue")
// purges all messages in queue
resp, err := rmqc.PurgeQueue("/", "a.queue")
// => *http.Response, err

// synchronises all messages in queue with the rest of mirrors in the cluster
resp, err := rmqc.SyncQueue("/", "a.queue")
// => *http.Response, err

// cancels queue synchronisation process
resp, err := rmqc.CancelSyncQueue("/", "a.queue")
// => *http.Response, err
```


Expand Down
36 changes: 36 additions & 0 deletions queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,39 @@ func (c *Client) PurgeQueue(vhost, queue string) (res *http.Response, err error)

return res, nil
}

// queueAction represents an action that can be performed on a queue (sync/cancel_sync)
type queueAction struct {
Action string `json:"action"`
}

// SyncQueue synchronises queue contents with the mirrors remaining in the cluster.
func (c *Client) SyncQueue(vhost, queue string) (res *http.Response, err error) {
return c.sendQueueAction(vhost, queue, queueAction{"sync"})
}

// CancelSyncQueue cancels queue synchronisation process.
func (c *Client) CancelSyncQueue(vhost, queue string) (res *http.Response, err error) {
return c.sendQueueAction(vhost, queue, queueAction{"cancel_sync"})
}

//
// POST /api/queues/{vhost}/{name}/actions
//
func (c *Client) sendQueueAction(vhost string, queue string, action queueAction) (res *http.Response, err error) {
body, err := json.Marshal(action)
if err != nil {
return nil, err
}

req, err := newRequestWithBody(c, "POST", "queues/"+url.PathEscape(vhost)+"/"+url.PathEscape(queue)+"/actions", body)
if err != nil {
return nil, err
}

if res, err = executeRequest(c, req); err != nil {
return nil, err
}

return res, nil
}
42 changes: 37 additions & 5 deletions rabbithole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,8 +726,8 @@ var _ = Describe("Rabbithole", func() {
Ω(err).Should(BeNil())

_, err = ch.Consume(
"", // queue
"", // consumer
"", // queue
"", // consumer
false, // auto ack
false, // exclusive
false, // no local
Expand Down Expand Up @@ -770,9 +770,9 @@ var _ = Describe("Rabbithole", func() {
Ω(err).Should(BeNil())

_, err = ch.Consume(
"", // queue
"", // consumer
true, // auto ack
"", // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
Expand Down Expand Up @@ -1583,6 +1583,38 @@ var _ = Describe("Rabbithole", func() {
})
})

Context("POST /queues/{vhost}/{queue}/actions", func() {
It("synchronises queue", func() {
vh := "rabbit/hole"
qn := "temporary"

_, err := rmqc.DeclareQueue(vh, qn, QueueSettings{Durable: false})
Ω(err).Should(BeNil())
awaitEventPropagation()

// it would be better to test this in a cluster configuration
x, err := rmqc.SyncQueue(vh, qn)
Ω(err).Should(BeNil())
Ω(x.StatusCode).Should(Equal(204))
rmqc.DeleteQueue(vh, qn)
})

It("cancels queue synchronisation", func() {
vh := "rabbit/hole"
qn := "temporary"

_, err := rmqc.DeclareQueue(vh, qn, QueueSettings{Durable: false})
Ω(err).Should(BeNil())
awaitEventPropagation()

// it would be better to test this in a cluster configuration
x, err := rmqc.CancelSyncQueue(vh, qn)
Ω(err).Should(BeNil())
Ω(x.StatusCode).Should(Equal(204))
rmqc.DeleteQueue(vh, qn)
})
})

Context("GET /policies", func() {
Context("when policy exists", func() {
It("returns decoded response", func() {
Expand Down

0 comments on commit cd63c64

Please sign in to comment.