diff --git a/provider/queue.go b/provider/queue.go index 1220779547b..a3268e10933 100644 --- a/provider/queue.go +++ b/provider/queue.go @@ -2,13 +2,14 @@ package provider import ( "context" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - "github.com/ipfs/go-datastore/query" "math" "strconv" "strings" + + cid "github.com/ipfs/go-cid" + datastore "github.com/ipfs/go-datastore" + namespace "github.com/ipfs/go-datastore/namespace" + query "github.com/ipfs/go-datastore/query" ) // Queue provides a durable, FIFO interface to the datastore for storing cids @@ -100,11 +101,16 @@ func (q *Queue) nextEntry() (datastore.Key, cid.Cid) { // Run dequeues and enqueues when available. func (q *Queue) work() { go func() { + var k datastore.Key = datastore.Key{} + var c cid.Cid = cid.Undef for { - k, c := q.nextEntry() - var dequeue chan cid.Cid + if c == cid.Undef { + k, c = q.nextEntry() + } + // If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue + var dequeue chan cid.Cid if c != cid.Undef { dequeue = q.dequeue } @@ -126,7 +132,7 @@ func (q *Queue) work() { log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) continue } - + c = cid.Undef q.head++ case <-q.ctx.Done(): return