diff --git a/core/builder.go b/core/builder.go index 3ddc58f3c332..2d6d8b664b83 100644 --- a/core/builder.go +++ b/core/builder.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "encoding/base64" "errors" + "github.com/ipfs/go-ipfs/provider" "os" "syscall" "time" @@ -275,6 +276,13 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { } n.Resolver = resolver.NewBasicResolver(n.DAG) + // Provider + queue, err := provider.NewQueue("provider-v1", ctx, n.Repo.Datastore()) + if err != nil { + return err + } + n.Provider = provider.NewProvider(ctx, queue, n.Routing) + if cfg.Online { if err := n.startLateOnlineServices(ctx); err != nil { return err diff --git a/core/core.go b/core/core.go index 0817118b3710..28cf6047a62e 100644 --- a/core/core.go +++ b/core/core.go @@ -14,6 +14,7 @@ import ( "context" "errors" "fmt" + "github.com/ipfs/go-ipfs/provider" "io" "io/ioutil" "os" @@ -124,6 +125,7 @@ type IpfsNode struct { Routing routing.IpfsRouting // the routing system. recommend ipfs-dht Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes + Provider *provider.Provider // the value provider system Reprovider *rp.Reprovider // the value reprovider system IpnsRepub *ipnsrp.Republisher @@ -324,6 +326,12 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { return err } + // Provider + + n.Provider.Run() + + // Reprovider + var keyProvider rp.KeyChanFunc switch cfg.Reprovider.Strategy { diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index c5ba1b566722..a10af96e9334 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -20,6 +20,7 @@ import ( "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/namesys" + "github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/repo" @@ -66,6 +67,8 @@ type CoreAPI struct { namesys namesys.NameSystem routing routing.IpfsRouting + provider *provider.Provider + pubSub *pubsub.PubSub checkPublishAllowed func() error @@ -174,6 +177,8 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e exchange: n.Exchange, routing: n.Routing, + provider: n.Provider, + pubSub: n.PubSub, nd: n, diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 1976517dd90a..df478732c162 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -32,6 +32,10 @@ func (api *PinAPI) Add(ctx context.Context, p coreiface.Path, opts ...caopts.Pin return fmt.Errorf("pin: %s", err) } + if err := api.provider.Provide(dagNode.Cid()); err != nil { + return err + } + return api.pinning.Flush() } diff --git a/core/coreapi/provider.go b/core/coreapi/provider.go new file mode 100644 index 000000000000..b22a3811c742 --- /dev/null +++ b/core/coreapi/provider.go @@ -0,0 +1,11 @@ +package coreapi + +import ( + cid "github.com/ipfs/go-cid" +) + +type ProviderAPI CoreAPI + +func (api *ProviderAPI) Provide(root cid.Cid) error { + return api.provider.Provide(root) +} diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index e26c755b935f..c840280bbca3 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -129,6 +129,11 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options if err != nil { return nil, err } + + if err := api.provider.Provide(nd.Cid()); err != nil { + return nil, err + } + return coreiface.IpfsPath(nd.Cid()), nil } diff --git a/provider/provider.go b/provider/provider.go new file mode 100644 index 000000000000..4cc0cc093014 --- /dev/null +++ b/provider/provider.go @@ -0,0 +1,90 @@ +// Package provider implements structures and methods to provide blocks, +// keep track of which blocks are provided, and to allow those blocks to +// be reprovided. +package provider + +import ( + "context" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-routing" + "time" +) + +var ( + log = logging.Logger("provider") +) + +const ( + provideOutgoingWorkerLimit = 8 + provideOutgoingTimeout = 15 * time.Second +) + +type Strategy func(context.Context, cid.Cid) <-chan cid.Cid + +// Provider announces blocks to the network, tracks which blocks are +// being provided, and untracks blocks when they're no longer in the blockstore. +type Provider struct { + ctx context.Context + // the CIDs for which provide announcements should be made + queue *Queue + // used to announce providing to the network + contentRouting routing.ContentRouting +} + +func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) *Provider { + return &Provider{ + ctx: ctx, + queue: queue, + contentRouting: contentRouting, + } +} + +// Start workers to handle provide requests. +func (p *Provider) Run() { + p.queue.Run() + p.handleAnnouncements() +} + +// Provide the given cid using specified strategy. +func (p *Provider) Provide(root cid.Cid) error { + return p.queue.Enqueue(root) +} + +// Handle all outgoing cids by providing (announcing) them +func (p *Provider) handleAnnouncements() { + for workers := 0; workers < provideOutgoingWorkerLimit; workers++ { + go func() { + for { + select { + case <-p.ctx.Done(): + return + case entry := <-p.queue.Dequeue(): + if err := doProvide(p.ctx, p.contentRouting, entry.cid); err != nil { + log.Warningf("Unable to provide entry: %s, %s", entry.cid, err) + } + + if err := entry.Complete(); err != nil { + log.Warningf("Unable to complete queue entry when providing: %s, %s", entry.cid, err) + } + } + } + }() + } +} + +// TODO: better document this provide logic +func doProvide(ctx context.Context, contentRouting routing.ContentRouting, key cid.Cid) error { + // announce + log.Info("announce - start - ", key) + ctx, cancel := context.WithTimeout(ctx, provideOutgoingTimeout) + if err := contentRouting.Provide(ctx, key, true); err != nil { + log.Warningf("Failed to provide cid: %s", err) + // TODO: Maybe put these failures onto a failures queue? + cancel() + return err + } + cancel() + log.Info("announce - end - ", key) + return nil +} diff --git a/provider/queue.go b/provider/queue.go new file mode 100644 index 000000000000..65656450a03b --- /dev/null +++ b/provider/queue.go @@ -0,0 +1,235 @@ +package provider + +import ( + "context" + "errors" + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-datastore/query" + "math" + "strconv" + "strings" + "sync" +) + +// Entry allows for the durability in the queue. When a cid is dequeued it is +// not removed from the datastore until you call Complete() on the entry you +// receive. +type Entry struct { + cid cid.Cid + key ds.Key + queue *Queue +} + +func (e *Entry) Complete() error { + return e.queue.remove(e.key) +} + +// Queue provides a durable, FIFO interface to the datastore for storing cids +// +// Durability just means that cids in the process of being provided when a +// crash or shutdown occurs will still be in the queue when the node is +// brought back online. +type Queue struct { + // used to differentiate queues in datastore + // e.g. provider vs reprovider + name string + + ctx context.Context + + tail uint64 + head uint64 + + lock sync.Mutex + datastore ds.Datastore + + dequeue chan *Entry + notEmpty chan struct{} + + isRunning bool +} + +func NewQueue(name string, ctx context.Context, datastore ds.Datastore) (*Queue, error) { + namespaced := namespace.Wrap(datastore, ds.NewKey("/" + name + "/queue/")) + head, tail, err := getQueueHeadTail(name, ctx, namespaced) + if err != nil { + return nil, err + } + q := &Queue{ + name: name, + ctx: ctx, + head: head, + tail: tail, + lock: sync.Mutex{}, + datastore: namespaced, + dequeue: make(chan *Entry), + notEmpty: make(chan struct{}), + isRunning: false, + } + return q, nil +} + +// Put a cid in the queue +func (q *Queue) Enqueue(cid cid.Cid) error { + q.lock.Lock() + defer q.lock.Unlock() + + wasEmpty := q.IsEmpty() + + nextKey := q.queueKey(q.tail) + + if err := q.datastore.Put(nextKey, cid.Bytes()); err != nil { + return err + } + + q.tail++ + + if q.isRunning && wasEmpty { + select { + case q.notEmpty <- struct{}{}: + case <-q.ctx.Done(): + } + } + + return nil +} + +// Remove an entry from the queue. +func (q *Queue) Dequeue() <-chan *Entry { + return q.dequeue +} + +func (q *Queue) IsEmpty() bool { + return (q.tail - q.head) == 0 +} + +func (q *Queue) remove(key ds.Key) error { + return q.datastore.Delete(key) +} + +// dequeue items when the dequeue channel is available to +// be written to +func (q *Queue) Run() { + q.isRunning = true + go func() { + for { + select { + case <-q.ctx.Done(): + return + default: + } + if q.IsEmpty() { + select { + case <-q.ctx.Done(): + return + // wait for a notEmpty message + case <-q.notEmpty: + } + } + + entry, err := q.next() + if err != nil { + log.Warningf("Error Dequeue()-ing: %s, %s", entry, err) + continue + } + + select { + case <-q.ctx.Done(): + return + case q.dequeue <- entry: + } + } + }() +} + +// Find the next item in the queue, crawl forward if an entry is not +// found in the next spot. +func (q *Queue) next() (*Entry, error) { + q.lock.Lock() + defer q.lock.Unlock() + + var nextKey ds.Key + var value []byte + var err error + for { + if q.head >= q.tail { + return nil, errors.New("no more entries in queue") + } + select { + case <-q.ctx.Done(): + return nil, nil + default: + } + nextKey = q.queueKey(q.head) + value, err = q.datastore.Get(nextKey) + if err == ds.ErrNotFound { + q.head++ + continue + } else if err != nil { + return nil, err + } else { + break + } + } + + id, err := cid.Parse(value) + if err != nil { + return nil, err + } + + entry := &Entry { + cid: id, + key: nextKey, + queue: q, + } + + q.head++ + + return entry, nil +} + +func (q *Queue) queueKey(id uint64) ds.Key { + return ds.NewKey(strconv.FormatUint(id, 10)) +} + +// crawl over the queue entries to find the head and tail +func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore) (uint64, uint64, error) { + query := query.Query{} + results, err := datastore.Query(query) + if err != nil { + return 0, 0, err + } + + var tail uint64 = 0 + var head uint64 = math.MaxUint64 + for entry := range results.Next() { + select { + case <-ctx.Done(): + return 0, 0, nil + default: + } + trimmed := strings.TrimPrefix(entry.Key, "/") + id, err := strconv.ParseUint(trimmed, 10, 64) + if err != nil { + return 0, 0, err + } + + if id < head { + head = id + } + + if (id+1) > tail { + tail = (id+1) + } + } + if err := results.Close(); err != nil { + return 0, 0, err + } + if head == math.MaxUint64 { + head = 0 + } + + return head, tail, nil +} +