Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reprovider strategies #4113

Merged
merged 10 commits into from
Aug 16, 2017
6 changes: 6 additions & 0 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,5 +231,11 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
}
n.Resolver = path.NewBasicResolver(n.DAG)

if cfg.Online {
if err := n.startLateOnlineServices(ctx); err != nil {
return err
}
}

return n.loadFilesRoot()
}
36 changes: 32 additions & 4 deletions core/commands/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ var BitswapCmd = &cmds.Command{
ShortDescription: ``,
},
Subcommands: map[string]*cmds.Command{
"wantlist": showWantlistCmd,
"stat": bitswapStatCmd,
"unwant": unwantCmd,
"ledger": ledgerCmd,
"wantlist": showWantlistCmd,
"stat": bitswapStatCmd,
"unwant": unwantCmd,
"ledger": ledgerCmd,
"reprovide": reprovideCmd,
},
}

Expand Down Expand Up @@ -242,3 +243,30 @@ prints the ledger associated with a given peer.
},
},
}

var reprovideCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Trigger reprovider.",
ShortDescription: `
Trigger reprovider to announce our data to network.
`,
},
Run: func(req cmds.Request, res cmds.Response) {
nd, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

if !nd.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
}

err = nd.Reprovider.Trigger(req.Context())
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
},
}
55 changes: 39 additions & 16 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,22 +237,6 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return err
}

n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore)

if cfg.Reprovider.Interval != "0" {
interval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return err
}

interval = dur
}

go n.Reprovider.ProvideEvery(ctx, interval)
}

if pubsub {
n.Floodsub = floodsub.NewFloodSub(ctx, peerhost)
}
Expand All @@ -273,6 +257,45 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return n.Bootstrap(DefaultBootstrapConfig)
}

func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
cfg, err := n.Repo.Config()
if err != nil {
return err
}

var keyProvider rp.KeyChanFunc

switch cfg.Reprovider.Strategy {
case "all":
fallthrough
case "":
keyProvider = rp.NewBlockstoreProvider(n.Blockstore)
case "roots":
keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, true)
case "pinned":
keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false)
default:
return fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)
}
n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider)

if cfg.Reprovider.Interval != "0" {
interval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return err
}

interval = dur
}

go n.Reprovider.ProvideEvery(interval)
}

return nil
}

func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) {
var annAddrs []ma.Multiaddr
for _, addr := range cfg.Announce {
Expand Down
12 changes: 10 additions & 2 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ a running daemon do not read the config file at runtime.
- [`Identity`](#identity)
- [`Ipns`](#ipns)
- [`Mounts`](#mounts)
- [`ReproviderInterval`](#reproviderinterval)
- [`Reprovider`](#reprovider)
- [`SupernodeRouting`](#supernoderouting)
- [`Swarm`](#swarm)
- [`Tour`](#tour)
Expand Down Expand Up @@ -193,7 +193,9 @@ Mountpoint for `/ipns/`.
- `FuseAllowOther`
Sets the FUSE allow other option on the mountpoint.

## `ReproviderInterval`
## `Reprovider`

- `Interval`
Sets the time between rounds of reproviding local content to the routing
system. If unset, it defaults to 12 hours. If set to the value `"0"` it will
disable content reproviding.
Expand All @@ -203,6 +205,12 @@ not being able to discover that you have the objects that you have. If you want
to have this disabled and keep the network aware of what you have, you must
manually announce your content periodically.

- `Strategy`
Tells reprovider what should be announced. Valid strategies are:
- "all" (default) - announce all stored data
- "pinned" - only announce pinned data
- "roots" - only announce directly pinned keys and root keys of recursive pins

## `SupernodeRouting`
Deprecated.

Expand Down
93 changes: 93 additions & 0 deletions exchange/reprovide/providers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package reprovide

import (
"context"

blocks "github.com/ipfs/go-ipfs/blocks/blockstore"
merkledag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"

cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
)

// NewBlockstoreProvider returns key provider using bstore.AllKeysChan
func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
return func(ctx context.Context) (<-chan *cid.Cid, error) {
return bstore.AllKeysChan(ctx)
}
}

// NewPinnedProvider returns provider supplying pinned keys
func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) KeyChanFunc {
return func(ctx context.Context) (<-chan *cid.Cid, error) {
set, err := pinSet(ctx, pinning, dag, onlyRoots)
if err != nil {
return nil, err
}

outCh := make(chan *cid.Cid)
go func() {
defer close(outCh)
for c := range set.new {
select {
case <-ctx.Done():
return
case outCh <- c:
}
}

}()

return outCh, nil
}
}

func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) (*streamingSet, error) {
set := newStreamingSet()

go func() {
defer close(set.new)

for _, key := range pinning.DirectKeys() {
set.add(key)
}

for _, key := range pinning.RecursiveKeys() {
set.add(key)

if !onlyRoots {
err := merkledag.EnumerateChildren(ctx, dag.GetLinks, key, set.add)
if err != nil {
log.Errorf("reprovide indirect pins: %s", err)
return
}
}
}
}()

return set, nil
}

type streamingSet struct {
set *cid.Set
new chan *cid.Cid
}

// NewSet initializes and returns a new Set.
func newStreamingSet() *streamingSet {
return &streamingSet{
set: cid.NewSet(),
new: make(chan *cid.Cid),
}
}

// add adds a Cid to the set only if it is
// not in it already.
func (s *streamingSet) add(c *cid.Cid) bool {
if s.set.Visit(c) {
s.new <- c
return true
}

return false
}
98 changes: 81 additions & 17 deletions exchange/reprovide/reprovide.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,56 +5,82 @@ import (
"fmt"
"time"

blocks "github.com/ipfs/go-ipfs/blocks/blockstore"
backoff "gx/ipfs/QmPJUtEJsm5YLUWhF6imvyCH8KZXRJa9Wup7FDMwTy5Ufz/backoff"
routing "gx/ipfs/QmPjTrrSfE6TzLv6ya6VWhGcCgPrUAdcgrDcQyRDX2VyW1/go-libp2p-routing"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
)

var log = logging.Logger("reprovider")

//KeyChanFunc is function streaming CIDs to pass to content routing
type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error)
type doneFunc func(error)

type Reprovider struct {
ctx context.Context
trigger chan doneFunc

// The routing system to provide values through
rsys routing.ContentRouting

// The backing store for blocks to be provided
bstore blocks.Blockstore
keyProvider KeyChanFunc
}

func NewReprovider(rsys routing.ContentRouting, bstore blocks.Blockstore) *Reprovider {
// NewReprovider creates new Reprovider instance.
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
return &Reprovider{
rsys: rsys,
bstore: bstore,
ctx: ctx,
trigger: make(chan doneFunc),

rsys: rsys,
keyProvider: keyProvider,
}
}

func (rp *Reprovider) ProvideEvery(ctx context.Context, tick time.Duration) {
// ProvideEvery re-provides keys with 'tick' interval
func (rp *Reprovider) ProvideEvery(tick time.Duration) {
// dont reprovide immediately.
// may have just started the daemon and shutting it down immediately.
// probability( up another minute | uptime ) increases with uptime.
after := time.After(time.Minute)
var done doneFunc
for {
select {
case <-ctx.Done():
case <-rp.ctx.Done():
return
case done = <-rp.trigger:
case <-after:
err := rp.Reprovide(ctx)
if err != nil {
log.Debug(err)
}
after = time.After(tick)
}

//'mute' the trigger channel so when `ipfs bitswap reprovide` is called
//a 'reprovider is already running' error is returned
unmute := rp.muteTrigger()

err := rp.Reprovide()
if err != nil {
log.Debug(err)
}

if done != nil {
done(err)
}

unmute()

after = time.After(tick)
}
}

func (rp *Reprovider) Reprovide(ctx context.Context) error {
keychan, err := rp.bstore.AllKeysChan(ctx)
// Reprovide registers all keys given by rp.keyProvider to libp2p content routing
func (rp *Reprovider) Reprovide() error {
keychan, err := rp.keyProvider(rp.ctx)
if err != nil {
return fmt.Errorf("Failed to get key chan from blockstore: %s", err)
return fmt.Errorf("Failed to get key chan: %s", err)
}
for c := range keychan {
op := func() error {
err := rp.rsys.Provide(ctx, c, true)
err := rp.rsys.Provide(rp.ctx, c, true)
if err != nil {
log.Debugf("Failed to provide key: %s", err)
}
Expand All @@ -71,3 +97,41 @@ func (rp *Reprovider) Reprovide(ctx context.Context) error {
}
return nil
}

// Trigger starts reprovision process in rp.ProvideEvery and waits for it
func (rp *Reprovider) Trigger(ctx context.Context) error {
progressCtx, done := context.WithCancel(ctx)

var err error
df := func(e error) {
err = e
done()
}

select {
case <-rp.ctx.Done():
return context.Canceled
case <-ctx.Done():
return context.Canceled
case rp.trigger <- df:
<-progressCtx.Done()
return err
}
}

func (rp *Reprovider) muteTrigger() context.CancelFunc {
ctx, cf := context.WithCancel(rp.ctx)
go func() {
defer cf()
for {
select {
case <-ctx.Done():
return
case done := <-rp.trigger:
done(fmt.Errorf("reprovider is already running"))
}
}
}()

return cf
}
Loading