From fcaf2b9f8bfedbd1d3e0da9755ed51e98947a21d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 15 Aug 2017 23:03:50 +0200 Subject: [PATCH] reprovider: apply review suggestions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- core/core.go | 4 ++-- docs/config.md | 2 +- exchange/reprovide/providers.go | 24 +++++++++--------------- exchange/reprovide/reprovide.go | 9 ++++++--- 4 files changed, 18 insertions(+), 21 deletions(-) diff --git a/core/core.go b/core/core.go index faf64ff5dd8b..28ec138310ad 100644 --- a/core/core.go +++ b/core/core.go @@ -263,7 +263,7 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { return err } - var keyProvider func(context.Context) (<-chan *cid.Cid, error) + var keyProvider rp.KeyChanFunc switch cfg.Reprovider.Strategy { case "all": @@ -275,7 +275,7 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { case "pinned": keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false) default: - return fmt.Errorf("unknown reprovider strtaegy '%s'", cfg.Reprovider.Strategy) + return fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) } n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider) diff --git a/docs/config.md b/docs/config.md index 02071e821b02..815bcf416b38 100644 --- a/docs/config.md +++ b/docs/config.md @@ -15,7 +15,7 @@ a running daemon do not read the config file at runtime. - [`Identity`](#identity) - [`Ipns`](#ipns) - [`Mounts`](#mounts) -- [`Reproviderl`](#reprovider) +- [`Reprovider`](#reprovider) - [`SupernodeRouting`](#supernoderouting) - [`Swarm`](#swarm) - [`Tour`](#tour) diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go index ca8ae5d8146e..5335e17651ce 100644 --- a/exchange/reprovide/providers.go +++ b/exchange/reprovide/providers.go @@ -11,14 +11,14 @@ import ( ) // NewBlockstoreProvider returns key provider using bstore.AllKeysChan -func NewBlockstoreProvider(bstore blocks.Blockstore) keyChanFunc { +func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { return func(ctx context.Context) (<-chan *cid.Cid, error) { return bstore.AllKeysChan(ctx) } } // NewPinnedProvider returns provider supplying pinned keys -func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) keyChanFunc { +func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) KeyChanFunc { return func(ctx context.Context) (<-chan *cid.Cid, error) { set, err := pinSet(ctx, pinning, dag, onlyRoots) if err != nil { @@ -46,6 +46,8 @@ func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, o set := newStreamingSet() go func() { + defer close(set.new) + for _, key := range pinning.DirectKeys() { set.add(key) } @@ -56,41 +58,33 @@ func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, o if !onlyRoots { err := merkledag.EnumerateChildren(ctx, dag.GetLinks, key, set.add) if err != nil { - return //TODO: propagate to chan / log? + log.Errorf("reprovide indirect pins: %s", err) + return } } } - - close(set.new) }() return set, nil } type streamingSet struct { - set map[string]struct{} + set *cid.Set new chan *cid.Cid } // NewSet initializes and returns a new Set. func newStreamingSet() *streamingSet { return &streamingSet{ - set: make(map[string]struct{}), + set: cid.NewSet(), new: make(chan *cid.Cid), } } -// has returns if the Set contains a given Cid. -func (s *streamingSet) has(c *cid.Cid) bool { - _, ok := s.set[string(c.Bytes())] - return ok -} - // add adds a Cid to the set only if it is // not in it already. func (s *streamingSet) add(c *cid.Cid) bool { - if !s.has(c) { - s.set[string(c.Bytes())] = struct{}{} + if s.set.Visit(c) { s.new <- c return true } diff --git a/exchange/reprovide/reprovide.go b/exchange/reprovide/reprovide.go index c779245abde6..a56220c6e348 100644 --- a/exchange/reprovide/reprovide.go +++ b/exchange/reprovide/reprovide.go @@ -13,7 +13,8 @@ import ( var log = logging.Logger("reprovider") -type keyChanFunc func(context.Context) (<-chan *cid.Cid, error) +//KeyChanFunc is function streaming CIDs to pass to content routing +type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error) type doneFunc func(error) type Reprovider struct { @@ -23,11 +24,11 @@ type Reprovider struct { // The routing system to provide values through rsys routing.ContentRouting - keyProvider keyChanFunc + keyProvider KeyChanFunc } // NewReprovider creates new Reprovider instance. -func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider keyChanFunc) *Reprovider { +func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { return &Reprovider{ ctx: ctx, trigger: make(chan doneFunc), @@ -52,6 +53,8 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) { case <-after: } + //'mute' the trigger channel so when `ipfs bitswap reprovide` is called + //a 'reprovider is already running' error is returned unmute := rp.muteTrigger() err := rp.Reprovide()