Skip to content

Commit

Permalink
Merge pull request #4113 from ipfs/feat/reprovider-starts
Browse files Browse the repository at this point in the history
Reprovider strategies
  • Loading branch information
whyrusleeping authored Aug 16, 2017
2 parents 9ea02e9 + 4a5b93a commit dea24ae
Show file tree
Hide file tree
Showing 10 changed files with 393 additions and 41 deletions.
6 changes: 6 additions & 0 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,5 +231,11 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
}
n.Resolver = path.NewBasicResolver(n.DAG)

if cfg.Online {
if err := n.startLateOnlineServices(ctx); err != nil {
return err
}
}

return n.loadFilesRoot()
}
36 changes: 32 additions & 4 deletions core/commands/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ var BitswapCmd = &cmds.Command{
ShortDescription: ``,
},
Subcommands: map[string]*cmds.Command{
"wantlist": showWantlistCmd,
"stat": bitswapStatCmd,
"unwant": unwantCmd,
"ledger": ledgerCmd,
"wantlist": showWantlistCmd,
"stat": bitswapStatCmd,
"unwant": unwantCmd,
"ledger": ledgerCmd,
"reprovide": reprovideCmd,
},
}

Expand Down Expand Up @@ -242,3 +243,30 @@ prints the ledger associated with a given peer.
},
},
}

var reprovideCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Trigger reprovider.",
ShortDescription: `
Trigger reprovider to announce our data to network.
`,
},
Run: func(req cmds.Request, res cmds.Response) {
nd, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

if !nd.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
}

err = nd.Reprovider.Trigger(req.Context())
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
},
}
55 changes: 39 additions & 16 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,22 +237,6 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return err
}

n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore)

if cfg.Reprovider.Interval != "0" {
interval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return err
}

interval = dur
}

go n.Reprovider.ProvideEvery(ctx, interval)
}

if pubsub {
n.Floodsub = floodsub.NewFloodSub(ctx, peerhost)
}
Expand All @@ -273,6 +257,45 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return n.Bootstrap(DefaultBootstrapConfig)
}

func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
cfg, err := n.Repo.Config()
if err != nil {
return err
}

var keyProvider rp.KeyChanFunc

switch cfg.Reprovider.Strategy {
case "all":
fallthrough
case "":
keyProvider = rp.NewBlockstoreProvider(n.Blockstore)
case "roots":
keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, true)
case "pinned":
keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false)
default:
return fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)
}
n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider)

if cfg.Reprovider.Interval != "0" {
interval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return err
}

interval = dur
}

go n.Reprovider.ProvideEvery(interval)
}

return nil
}

func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) {
var annAddrs []ma.Multiaddr
for _, addr := range cfg.Announce {
Expand Down
12 changes: 10 additions & 2 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ a running daemon do not read the config file at runtime.
- [`Identity`](#identity)
- [`Ipns`](#ipns)
- [`Mounts`](#mounts)
- [`ReproviderInterval`](#reproviderinterval)
- [`Reprovider`](#reprovider)
- [`SupernodeRouting`](#supernoderouting)
- [`Swarm`](#swarm)

Expand Down Expand Up @@ -192,7 +192,9 @@ Mountpoint for `/ipns/`.
- `FuseAllowOther`
Sets the FUSE allow other option on the mountpoint.

## `ReproviderInterval`
## `Reprovider`

- `Interval`
Sets the time between rounds of reproviding local content to the routing
system. If unset, it defaults to 12 hours. If set to the value `"0"` it will
disable content reproviding.
Expand All @@ -202,6 +204,12 @@ not being able to discover that you have the objects that you have. If you want
to have this disabled and keep the network aware of what you have, you must
manually announce your content periodically.

- `Strategy`
Tells reprovider what should be announced. Valid strategies are:
- "all" (default) - announce all stored data
- "pinned" - only announce pinned data
- "roots" - only announce directly pinned keys and root keys of recursive pins

## `SupernodeRouting`
Deprecated.

Expand Down
93 changes: 93 additions & 0 deletions exchange/reprovide/providers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package reprovide

import (
"context"

blocks "github.com/ipfs/go-ipfs/blocks/blockstore"
merkledag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"

cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
)

// NewBlockstoreProvider returns key provider using bstore.AllKeysChan
func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
return func(ctx context.Context) (<-chan *cid.Cid, error) {
return bstore.AllKeysChan(ctx)
}
}

// NewPinnedProvider returns provider supplying pinned keys
func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) KeyChanFunc {
return func(ctx context.Context) (<-chan *cid.Cid, error) {
set, err := pinSet(ctx, pinning, dag, onlyRoots)
if err != nil {
return nil, err
}

outCh := make(chan *cid.Cid)
go func() {
defer close(outCh)
for c := range set.new {
select {
case <-ctx.Done():
return
case outCh <- c:
}
}

}()

return outCh, nil
}
}

func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) (*streamingSet, error) {
set := newStreamingSet()

go func() {
defer close(set.new)

for _, key := range pinning.DirectKeys() {
set.add(key)
}

for _, key := range pinning.RecursiveKeys() {
set.add(key)

if !onlyRoots {
err := merkledag.EnumerateChildren(ctx, dag.GetLinks, key, set.add)
if err != nil {
log.Errorf("reprovide indirect pins: %s", err)
return
}
}
}
}()

return set, nil
}

type streamingSet struct {
set *cid.Set
new chan *cid.Cid
}

// NewSet initializes and returns a new Set.
func newStreamingSet() *streamingSet {
return &streamingSet{
set: cid.NewSet(),
new: make(chan *cid.Cid),
}
}

// add adds a Cid to the set only if it is
// not in it already.
func (s *streamingSet) add(c *cid.Cid) bool {
if s.set.Visit(c) {
s.new <- c
return true
}

return false
}
98 changes: 81 additions & 17 deletions exchange/reprovide/reprovide.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,56 +5,82 @@ import (
"fmt"
"time"

blocks "github.com/ipfs/go-ipfs/blocks/blockstore"
backoff "gx/ipfs/QmPJUtEJsm5YLUWhF6imvyCH8KZXRJa9Wup7FDMwTy5Ufz/backoff"
routing "gx/ipfs/QmPjTrrSfE6TzLv6ya6VWhGcCgPrUAdcgrDcQyRDX2VyW1/go-libp2p-routing"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
)

var log = logging.Logger("reprovider")

//KeyChanFunc is function streaming CIDs to pass to content routing
type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error)
type doneFunc func(error)

type Reprovider struct {
ctx context.Context
trigger chan doneFunc

// The routing system to provide values through
rsys routing.ContentRouting

// The backing store for blocks to be provided
bstore blocks.Blockstore
keyProvider KeyChanFunc
}

func NewReprovider(rsys routing.ContentRouting, bstore blocks.Blockstore) *Reprovider {
// NewReprovider creates new Reprovider instance.
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
return &Reprovider{
rsys: rsys,
bstore: bstore,
ctx: ctx,
trigger: make(chan doneFunc),

rsys: rsys,
keyProvider: keyProvider,
}
}

func (rp *Reprovider) ProvideEvery(ctx context.Context, tick time.Duration) {
// ProvideEvery re-provides keys with 'tick' interval
func (rp *Reprovider) ProvideEvery(tick time.Duration) {
// dont reprovide immediately.
// may have just started the daemon and shutting it down immediately.
// probability( up another minute | uptime ) increases with uptime.
after := time.After(time.Minute)
var done doneFunc
for {
select {
case <-ctx.Done():
case <-rp.ctx.Done():
return
case done = <-rp.trigger:
case <-after:
err := rp.Reprovide(ctx)
if err != nil {
log.Debug(err)
}
after = time.After(tick)
}

//'mute' the trigger channel so when `ipfs bitswap reprovide` is called
//a 'reprovider is already running' error is returned
unmute := rp.muteTrigger()

err := rp.Reprovide()
if err != nil {
log.Debug(err)
}

if done != nil {
done(err)
}

unmute()

after = time.After(tick)
}
}

func (rp *Reprovider) Reprovide(ctx context.Context) error {
keychan, err := rp.bstore.AllKeysChan(ctx)
// Reprovide registers all keys given by rp.keyProvider to libp2p content routing
func (rp *Reprovider) Reprovide() error {
keychan, err := rp.keyProvider(rp.ctx)
if err != nil {
return fmt.Errorf("Failed to get key chan from blockstore: %s", err)
return fmt.Errorf("Failed to get key chan: %s", err)
}
for c := range keychan {
op := func() error {
err := rp.rsys.Provide(ctx, c, true)
err := rp.rsys.Provide(rp.ctx, c, true)
if err != nil {
log.Debugf("Failed to provide key: %s", err)
}
Expand All @@ -71,3 +97,41 @@ func (rp *Reprovider) Reprovide(ctx context.Context) error {
}
return nil
}

// Trigger starts reprovision process in rp.ProvideEvery and waits for it
func (rp *Reprovider) Trigger(ctx context.Context) error {
progressCtx, done := context.WithCancel(ctx)

var err error
df := func(e error) {
err = e
done()
}

select {
case <-rp.ctx.Done():
return context.Canceled
case <-ctx.Done():
return context.Canceled
case rp.trigger <- df:
<-progressCtx.Done()
return err
}
}

func (rp *Reprovider) muteTrigger() context.CancelFunc {
ctx, cf := context.WithCancel(rp.ctx)
go func() {
defer cf()
for {
select {
case <-ctx.Done():
return
case done := <-rp.trigger:
done(fmt.Errorf("reprovider is already running"))
}
}
}()

return cf
}
Loading

0 comments on commit dea24ae

Please sign in to comment.