Skip to content

Improve performance of data onboarding #888

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Apr 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The following emojis are used to highlight certain changes:

- upgrade to `go-libp2p` [v0.41.1](https://github.com/libp2p/go-libp2p/releases/tag/v0.41.1)
- `bitswap/network`: Add a new `requests_in_flight` metric gauge that measures how many bitswap streams are being written or read at a given time.
- improve speed of data onboarding by batching/bufering provider queue writes [#888](https://github.com/ipfs/boxo/pull/888)

### Removed

Expand Down
300 changes: 222 additions & 78 deletions provider/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package queue

import (
"context"
"encoding/base64"
"errors"
"fmt"
"sync"
"time"

"github.com/gammazero/deque"
cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
namespace "github.com/ipfs/go-datastore/namespace"
Expand All @@ -15,136 +18,251 @@ import (

var log = logging.Logger("provider.queue")

// Queue provides a best-effort durability, FIFO interface to the datastore for storing cids
const (
// batchSize is the limit on number of CIDs kept in memory at which ther
// are all written to the datastore.
batchSize = 16 * 1024
// idleWriteTime is the amout of time to check if the queue has been idle
// (no input or output). If the queue has been idle since the last check,
// then write all buffered CIDs to the datastore.
idleWriteTime = time.Minute
// shutdownTimeout is the duration that Close waits to finish writing CIDs
// to the datastore.
shutdownTimeout = 10 * time.Second
)

// Queue provides a FIFO interface to the datastore for storing cids.
//
// CIDs in the process of being provided when a crash or shutdown occurs may be
// in the queue when the node is brought back online depending on whether they
// were fully written to the underlying datastore.
//
// Best-effort durability just means that cids in the process of being provided when a
// crash or shutdown occurs may be in the queue when the node is brought back online
// depending on whether the underlying datastore has synchronous or asynchronous writes.
// Input to the queue is buffered in memory. The contents of the buffer are
// written to the datastore when the input buffer contains batchSize items, or
// when idleWriteTime has elapsed since the previous batch write or dequeue. CIDs to
// dequeue are read, in order, from the input buffer if there are none in the
// datastore. Otherwise they are read from the datastore.
//
// If queued items are read from the input buffer before it reaches its limit,
// then queued items can remain in memory. When the queue is closed, any
// remaining items in memory are written to the datastore.
type Queue struct {
// used to differentiate queues in datastore
// e.g. provider vs reprovider
ctx context.Context
ds datastore.Datastore // Must be threadsafe
dequeue chan cid.Cid
enqueue chan cid.Cid
close context.CancelFunc
closed sync.WaitGroup

counter uint64
close context.CancelFunc
closed chan error
closeOnce sync.Once
dequeue chan cid.Cid
ds datastore.Batching
enqueue chan cid.Cid
}

// NewQueue creates a queue for cids
func NewQueue(ds datastore.Datastore) *Queue {
namespaced := namespace.Wrap(ds, datastore.NewKey("/queue"))
cancelCtx, cancel := context.WithCancel(context.Background())
// New creates a queue for cids.
func New(ds datastore.Batching) *Queue {
ctx, cancel := context.WithCancel(context.Background())

q := &Queue{
ctx: cancelCtx,
ds: namespaced,
close: cancel,
closed: make(chan error, 1),
dequeue: make(chan cid.Cid),
ds: namespace.Wrap(ds, datastore.NewKey("/queue")),
enqueue: make(chan cid.Cid),
close: cancel,
}
q.closed.Add(1)
go q.worker()

go q.worker(ctx)

return q
}

// Close stops the queue
// Close stops the queue.
func (q *Queue) Close() error {
q.close()
q.closed.Wait()
// We don't close dequeue because the provider which consume this get caught in
// an infinite loop dequeing cid.Undef if we do that.
// The provider has it's own select on top of dequeue and will handle this by itself.
return nil
var err error
q.closeOnce.Do(func() {
// Close input queue and wait for worker to finish reading it.
close(q.enqueue)
select {
case <-q.closed:
case <-time.After(shutdownTimeout):
q.close() // force immediate shutdown
err = <-q.closed
}
close(q.dequeue) // no more output from this queue
})
return err
}

// Enqueue puts a cid in the queue
func (q *Queue) Enqueue(cid cid.Cid) error {
select {
case q.enqueue <- cid:
return nil
case <-q.ctx.Done():
return errors.New("failed to enqueue CID: shutting down")
// Enqueue puts a cid in the queue.
func (q *Queue) Enqueue(c cid.Cid) (err error) {
if c == cid.Undef {
return
}
defer func() {
if r := recover(); r != nil {
err = errors.New("failed to enqueue CID: shutting down")
}
}()
q.enqueue <- c
return
}

// Dequeue returns a channel that if listened to will remove entries from the queue
// Dequeue returns a channel that for reading entries from the queue,
func (q *Queue) Dequeue() <-chan cid.Cid {
return q.dequeue
}

func makeCidString(c cid.Cid) string {
data := c.Bytes()
if len(data) > 4 {
data = data[len(data)-4:]
}
return base64.RawURLEncoding.EncodeToString(data)
}

func makeKey(c cid.Cid, counter uint64) datastore.Key {
return datastore.NewKey(fmt.Sprintf("%020d/%s", counter, makeCidString(c)))
}

// worker run dequeues and enqueues when available.
func (q *Queue) worker() {
var k datastore.Key = datastore.Key{}
var c cid.Cid = cid.Undef
func (q *Queue) worker(ctx context.Context) {
defer close(q.closed)

var (
c cid.Cid
counter uint64
k datastore.Key = datastore.Key{}
inBuf deque.Deque[cid.Cid]
)

const baseCap = 1024
inBuf.SetBaseCap(baseCap)

defer func() {
if c != cid.Undef {
if err := q.ds.Put(ctx, k, c.Bytes()); err != nil {
log.Errorw("Failed to write cid to datastore", "err", err)
}
counter++
}
if inBuf.Len() != 0 {
err := q.commitInput(ctx, counter, &inBuf)
if err != nil && !errors.Is(err, context.Canceled) {
log.Error(err)
if inBuf.Len() != 0 {
q.closed <- fmt.Errorf("provider queue: %d cids not written to datastore", inBuf.Len())
}
}
}
}()

var (
commit bool
dsEmpty bool
err error
idle bool
)

readInBuf := q.enqueue

defer q.closed.Done()
defer q.close()
batchTimer := time.NewTimer(idleWriteTime)
defer batchTimer.Stop()

for {
if c == cid.Undef {
head, err := q.getQueueHead()

switch {
case err != nil:
log.Errorf("error querying for head of queue: %s, stopping provider", err)
return
case head != nil:
k = datastore.NewKey(head.Key)
c, err = cid.Parse(head.Value)
if !dsEmpty {
head, err := q.getQueueHead(ctx)
if err != nil {
log.Warnf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, err)
err = q.ds.Delete(q.ctx, k)
if err != nil {
log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err)
log.Errorw("Error querying for head of queue, stopping provider", "err", err)
return
}
if head != nil {
k = datastore.NewKey(head.Key)
if err = q.ds.Delete(ctx, k); err != nil {
log.Errorw("Error deleting queue entry, stopping provider", "err", err, "key", head.Key)
return
}
continue
c, err = cid.Parse(head.Value)
if err != nil {
log.Warnw("Error parsing queue entry cid, removing it from queue", "err", err, "key", head.Key)
continue
}
} else {
dsEmpty = true
}
default:
c = cid.Undef
}
if dsEmpty && inBuf.Len() != 0 {
// There were no queued CIDs in the datastore, so read one from
// the input buffer.
c = inBuf.PopFront()
k = makeKey(c, counter)
}
}

// If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue
// If c != cid.Undef set dequeue and attempt write.
var dequeue chan cid.Cid
if c != cid.Undef {
dequeue = q.dequeue
}

select {
case toQueue := <-q.enqueue:
keyPath := fmt.Sprintf("%020d/%s", q.counter, c.String())
q.counter++
nextKey := datastore.NewKey(keyPath)
case toQueue, ok := <-readInBuf:
if !ok {
return
}
idle = false

if c == cid.Undef {
// fast path, skip rereading the datastore if we don't have anything in hand yet
// Use this CID as the next output since there was nothing in
// the datastore or buffer previously.
c = toQueue
k = nextKey
k = makeKey(c, counter)
continue
}

if err := q.ds.Put(q.ctx, nextKey, toQueue.Bytes()); err != nil {
log.Errorf("Failed to enqueue cid: %s", err)
continue
inBuf.PushBack(toQueue)
if inBuf.Len() >= batchSize {
commit = true
}
case dequeue <- c:
err := q.ds.Delete(q.ctx, k)
if err != nil {
log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err)
continue
}
c = cid.Undef
case <-q.ctx.Done():
idle = false
case <-batchTimer.C:
if idle {
if inBuf.Len() != 0 {
commit = true
} else {
if inBuf.Cap() > baseCap {
inBuf = deque.Deque[cid.Cid]{}
inBuf.SetBaseCap(baseCap)
}
}
}
idle = true
batchTimer.Reset(idleWriteTime)

case <-ctx.Done():
return
}

if commit {
commit = false
n := inBuf.Len()
err = q.commitInput(ctx, counter, &inBuf)
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Errorw("Error writing CIDs to datastore, stopping provider", "err", err)
}
return
}
counter += uint64(n)
dsEmpty = false
}
}
}

func (q *Queue) getQueueHead() (*query.Entry, error) {
qry := query.Query{Orders: []query.Order{query.OrderByKey{}}, Limit: 1}
results, err := q.ds.Query(q.ctx, qry)
func (q *Queue) getQueueHead(ctx context.Context) (*query.Entry, error) {
qry := query.Query{
Orders: []query.Order{query.OrderByKey{}},
Limit: 1,
}
results, err := q.ds.Query(ctx, qry)
if err != nil {
return nil, err
}
Expand All @@ -156,3 +274,29 @@ func (q *Queue) getQueueHead() (*query.Entry, error) {

return &r.Entry, r.Error
}

func (q *Queue) commitInput(ctx context.Context, counter uint64, cids *deque.Deque[cid.Cid]) error {
b, err := q.ds.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create batch: %w", err)
}

cstr := makeCidString(cids.Front())
n := cids.Len()
for i := 0; i < n; i++ {
c := cids.At(i)
key := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, cstr))
if err = b.Put(ctx, key, c.Bytes()); err != nil {
log.Errorw("Failed to add cid to batch", "err", err)
continue
}
counter++
}
cids.Clear()

if err = b.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit batch to datastore: %w", err)
}

return nil
}
Loading