Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix node construction queue error #6480

Merged
merged 3 commits into from
Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 30 additions & 93 deletions provider/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package queue
import (
"context"
"fmt"
"strconv"
"strings"
"time"

cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
Expand All @@ -25,8 +24,6 @@ type Queue struct {
// e.g. provider vs reprovider
name string
ctx context.Context
tail uint64
head uint64
ds datastore.Datastore // Must be threadsafe
dequeue chan cid.Cid
enqueue chan cid.Cid
Expand All @@ -37,16 +34,10 @@ type Queue struct {
// NewQueue creates a queue for cids
func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) {
namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/"))
head, tail, err := getQueueHeadTail(ctx, namespaced)
if err != nil {
return nil, err
}
cancelCtx, cancel := context.WithCancel(ctx)
q := &Queue{
name: name,
ctx: cancelCtx,
head: head,
tail: tail,
ds: namespaced,
dequeue: make(chan cid.Cid),
enqueue: make(chan cid.Cid),
Expand Down Expand Up @@ -77,41 +68,6 @@ func (q *Queue) Dequeue() <-chan cid.Cid {
return q.dequeue
}

// Look for next Cid in the queue and return it. Skip over gaps and mangled data
func (q *Queue) nextEntry() (datastore.Key, cid.Cid) {
for {
if q.head >= q.tail {
return datastore.Key{}, cid.Undef
}

key := q.queueKey(q.head)
value, err := q.ds.Get(key)

if err != nil {
if err == datastore.ErrNotFound {
log.Warningf("Error missing entry in queue: %s", key)
} else {
log.Errorf("Error fetching from queue: %s", err)
}
q.head++ // move on
continue
}

c, err := cid.Parse(value)
if err != nil {
log.Warningf("Error marshalling Cid from queue: ", err)
q.head++
err = q.ds.Delete(key)
if err != nil {
log.Warningf("Provider queue failed to delete: %s", key)
}
continue
}

return key, c
}
}

// Run dequeues and enqueues when available.
func (q *Queue) work() {
go func() {
Expand All @@ -124,7 +80,26 @@ func (q *Queue) work() {

for {
if c == cid.Undef {
k, c = q.nextEntry()
head, e := q.getQueueHead()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we'd:

  1. Open a query.
  2. Iterate over the query to the end.
  3. Open a new query when we get the next CID.

(but we can punt on that)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got you. I'll keep that in mind.


if e != nil {
log.Errorf("error querying for head of queue: %s, stopping provider", e)
return
michaelavila marked this conversation as resolved.
Show resolved Hide resolved
} else if head != nil {
k = datastore.NewKey(head.Key)
c, e = cid.Parse(head.Value)
if e != nil {
log.Warningf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, e)
err := q.ds.Delete(k)
if err != nil {
log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err)
return
michaelavila marked this conversation as resolved.
Show resolved Hide resolved
}
continue
}
} else {
c = cid.Undef
}
}

// If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue
Expand All @@ -135,14 +110,13 @@ func (q *Queue) work() {

select {
case toQueue := <-q.enqueue:
nextKey := q.queueKey(q.tail)
keyPath := fmt.Sprintf("%d/%s", time.Now().UnixNano(), c.String())
michaelavila marked this conversation as resolved.
Show resolved Hide resolved
nextKey := datastore.NewKey(keyPath)

if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil {
log.Errorf("Failed to enqueue cid: %s", err)
continue
}

q.tail++
case dequeue <- c:
err := q.ds.Delete(k)

Expand All @@ -151,61 +125,24 @@ func (q *Queue) work() {
continue
}
c = cid.Undef
q.head++
case <-q.ctx.Done():
return
}
}
}()
}

func (q *Queue) queueKey(id uint64) datastore.Key {
s := fmt.Sprintf("%016X", id)
return datastore.NewKey(s)
}

func getQueueHeadTail(ctx context.Context, datastore datastore.Datastore) (uint64, uint64, error) {
head, err := getQueueHead(datastore)
if err != nil {
return 0, 0, err
}
tail, err := getQueueTail(datastore)
if err != nil {
return 0, 0, err
}
return head, tail, nil
}

func getQueueHead(ds datastore.Datastore) (uint64, error) {
return getFirstIDByOrder(ds, query.OrderByKey{})
}

func getQueueTail(ds datastore.Datastore) (uint64, error) {
tail, err := getFirstIDByOrder(ds, query.OrderByKeyDescending{})
func (q *Queue) getQueueHead() (*query.Result, error) {
qry := query.Query{Orders: []query.Order{query.OrderByKey{}}, Limit: 1}
results, err := q.ds.Query(qry)
if err != nil {
return 0, err
}
if tail > 0 {
tail++
}
return tail, nil
}

func getFirstIDByOrder(ds datastore.Datastore, order query.Order) (uint64, error) {
q := query.Query{Orders: []query.Order{order}}
results, err := ds.Query(q)
if err != nil {
return 0, err
return nil, err
}
defer results.Close()
r, ok := results.NextSync()
if !ok {
return 0, nil
}
trimmed := strings.TrimPrefix(r.Key, "/")
id, err := strconv.ParseUint(trimmed, 16, 64)
if err != nil {
return 0, err
return nil, nil
}
return id, nil

return &r, nil
}
44 changes: 8 additions & 36 deletions provider/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"testing"
"time"

cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
sync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-ipfs-blocksutil"
)

Expand Down Expand Up @@ -55,36 +55,6 @@ func TestBasicOperation(t *testing.T) {
assertOrdered(cids, queue, t)
}

func TestSparseDatastore(t *testing.T) {
ctx := context.Background()
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}

cids := makeCids(10)
for _, c := range cids {
queue.Enqueue(c)
}

// remove entries in the middle
err = queue.ds.Delete(queue.queueKey(5))
if err != nil {
t.Fatal(err)
}

err = queue.ds.Delete(queue.queueKey(6))
if err != nil {
t.Fatal(err)
}

expected := append(cids[:5], cids[7:]...)
assertOrdered(expected, queue, t)
}

func TestMangledData(t *testing.T) {
ctx := context.Background()
defer ctx.Done()
Expand All @@ -100,13 +70,15 @@ func TestMangledData(t *testing.T) {
queue.Enqueue(c)
}

// remove entries in the middle
err = queue.ds.Put(queue.queueKey(5), []byte("borked"))
// put bad data in the queue
queueKey := datastore.NewKey("/test/0")
err = queue.ds.Put(queueKey, []byte("borked"))
if err != nil {
t.Fatal(err)
}

expected := append(cids[:5], cids[6:]...)
// expect to only see the valid cids we entered
expected := cids
assertOrdered(expected, queue, t)
}

Expand Down