Skip to content

Commit

Permalink
Provide root node immediately when add and pin add
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Michael Avila <[email protected]>
  • Loading branch information
michaelavila committed Mar 8, 2019
1 parent 245c40b commit 9301556
Show file tree
Hide file tree
Showing 8 changed files with 366 additions and 0 deletions.
8 changes: 8 additions & 0 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"encoding/base64"
"errors"
"github.com/ipfs/go-ipfs/provider"
"os"
"syscall"
"time"
Expand Down Expand Up @@ -275,6 +276,13 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
}
n.Resolver = resolver.NewBasicResolver(n.DAG)

// Provider
queue, err := provider.NewQueue("provider-v1", ctx, n.Repo.Datastore())
if err != nil {
return err
}
n.Provider = provider.NewProvider(ctx, queue, n.Routing)

if cfg.Online {
if err := n.startLateOnlineServices(ctx); err != nil {
return err
Expand Down
8 changes: 8 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"errors"
"fmt"
"github.com/ipfs/go-ipfs/provider"
"io"
"io/ioutil"
"os"
Expand Down Expand Up @@ -124,6 +125,7 @@ type IpfsNode struct {
Routing routing.IpfsRouting // the routing system. recommend ipfs-dht
Exchange exchange.Interface // the block exchange + strategy (bitswap)
Namesys namesys.NameSystem // the name system, resolves paths to hashes
Provider *provider.Provider // the value provider system
Reprovider *rp.Reprovider // the value reprovider system
IpnsRepub *ipnsrp.Republisher

Expand Down Expand Up @@ -324,6 +326,12 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
return err
}

// Provider

n.Provider.Run()

// Reprovider

var keyProvider rp.KeyChanFunc

switch cfg.Reprovider.Strategy {
Expand Down
5 changes: 5 additions & 0 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/namesys"
"github.com/ipfs/go-ipfs/provider"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/repo"

Expand Down Expand Up @@ -66,6 +67,8 @@ type CoreAPI struct {
namesys namesys.NameSystem
routing routing.IpfsRouting

provider *provider.Provider

pubSub *pubsub.PubSub

checkPublishAllowed func() error
Expand Down Expand Up @@ -174,6 +177,8 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e
exchange: n.Exchange,
routing: n.Routing,

provider: n.Provider,

pubSub: n.PubSub,

nd: n,
Expand Down
4 changes: 4 additions & 0 deletions core/coreapi/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (api *PinAPI) Add(ctx context.Context, p coreiface.Path, opts ...caopts.Pin
return fmt.Errorf("pin: %s", err)
}

if err := api.provider.Provide(dagNode.Cid()); err != nil {
return err
}

return api.pinning.Flush()
}

Expand Down
11 changes: 11 additions & 0 deletions core/coreapi/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package coreapi

import (
cid "github.com/ipfs/go-cid"
)

type ProviderAPI CoreAPI

func (api *ProviderAPI) Provide(root cid.Cid) error {
return api.provider.Provide(root)
}
5 changes: 5 additions & 0 deletions core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
if err != nil {
return nil, err
}

if err := api.provider.Provide(nd.Cid()); err != nil {
return nil, err
}

return coreiface.IpfsPath(nd.Cid()), nil
}

Expand Down
90 changes: 90 additions & 0 deletions provider/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Package provider implements structures and methods to provide blocks,
// keep track of which blocks are provided, and to allow those blocks to
// be reprovided.
package provider

import (
"context"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-routing"
"time"
)

var (
log = logging.Logger("provider")
)

const (
provideOutgoingWorkerLimit = 8
provideOutgoingTimeout = 15 * time.Second
)

type Strategy func(context.Context, cid.Cid) <-chan cid.Cid

// Provider announces blocks to the network, tracks which blocks are
// being provided, and untracks blocks when they're no longer in the blockstore.
type Provider struct {
ctx context.Context
// the CIDs for which provide announcements should be made
queue *Queue
// used to announce providing to the network
contentRouting routing.ContentRouting
}

func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) *Provider {
return &Provider{
ctx: ctx,
queue: queue,
contentRouting: contentRouting,
}
}

// Start workers to handle provide requests.
func (p *Provider) Run() {
p.queue.Run()
p.handleAnnouncements()
}

// Provide the given cid using specified strategy.
func (p *Provider) Provide(root cid.Cid) error {
return p.queue.Enqueue(root)
}

// Handle all outgoing cids by providing (announcing) them
func (p *Provider) handleAnnouncements() {
for workers := 0; workers < provideOutgoingWorkerLimit; workers++ {
go func() {
for {
select {
case <-p.ctx.Done():
return
case entry := <-p.queue.Dequeue():
if err := doProvide(p.ctx, p.contentRouting, entry.cid); err != nil {
log.Warningf("Unable to provide entry: %s, %s", entry.cid, err)
}

if err := entry.Complete(); err != nil {
log.Warningf("Unable to complete queue entry when providing: %s, %s", entry.cid, err)
}
}
}
}()
}
}

// TODO: better document this provide logic
func doProvide(ctx context.Context, contentRouting routing.ContentRouting, key cid.Cid) error {
// announce
log.Info("announce - start - ", key)
ctx, cancel := context.WithTimeout(ctx, provideOutgoingTimeout)
if err := contentRouting.Provide(ctx, key, true); err != nil {
log.Warningf("Failed to provide cid: %s", err)
// TODO: Maybe put these failures onto a failures queue?
cancel()
return err
}
cancel()
log.Info("announce - end - ", key)
return nil
}
Loading

0 comments on commit 9301556

Please sign in to comment.