From f20683eb531e8e798b8f5f6d09067567a6e58fe2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 1 Aug 2017 02:57:21 +0200 Subject: [PATCH 01/10] Reprovider strategies MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/builder.go | 6 +++ core/core.go | 55 +++++++++++++++++------- exchange/reprovide/providers.go | 62 ++++++++++++++++++++++++++++ exchange/reprovide/reprovide.go | 17 ++++---- exchange/reprovide/reprovide_test.go | 3 +- repo/config/init.go | 1 + repo/config/reprovider.go | 1 + 7 files changed, 120 insertions(+), 25 deletions(-) create mode 100644 exchange/reprovide/providers.go diff --git a/core/builder.go b/core/builder.go index 28a5a283b92..065c9cbd87e 100644 --- a/core/builder.go +++ b/core/builder.go @@ -231,5 +231,11 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { } n.Resolver = path.NewBasicResolver(n.DAG) + if cfg.Online { + if err := n.startLateOnlineServices(ctx); err != nil { + return err + } + } + return n.loadFilesRoot() } diff --git a/core/core.go b/core/core.go index 8afd615db35..33a6b3b5081 100644 --- a/core/core.go +++ b/core/core.go @@ -237,22 +237,6 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin return err } - n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore) - - if cfg.Reprovider.Interval != "0" { - interval := kReprovideFrequency - if cfg.Reprovider.Interval != "" { - dur, err := time.ParseDuration(cfg.Reprovider.Interval) - if err != nil { - return err - } - - interval = dur - } - - go n.Reprovider.ProvideEvery(ctx, interval) - } - if pubsub { n.Floodsub = floodsub.NewFloodSub(ctx, peerhost) } @@ -273,6 +257,45 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin return n.Bootstrap(DefaultBootstrapConfig) } +func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { + cfg, err := n.Repo.Config() + if err != nil { + return err + } + + var keyProvider func(context.Context) (<-chan *cid.Cid, error) + + switch cfg.Reprovider.Strategy { + case "all": + fallthrough + case "": + keyProvider = rp.NewBlockstoreProvider(n.Blockstore) + case "roots": + keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, true) + case "pinned": + keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false) + default: + return fmt.Errorf("unknown reprovider strtaegy '%s'", cfg.Reprovider.Strategy) + } + n.Reprovider = rp.NewReprovider(n.Routing, keyProvider) + + if cfg.Reprovider.Interval != "0" { + interval := kReprovideFrequency + if cfg.Reprovider.Interval != "" { + dur, err := time.ParseDuration(cfg.Reprovider.Interval) + if err != nil { + return err + } + + interval = dur + } + + go n.Reprovider.ProvideEvery(ctx, interval) + } + + return nil +} + func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) { var annAddrs []ma.Multiaddr for _, addr := range cfg.Announce { diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go new file mode 100644 index 00000000000..27be34855d8 --- /dev/null +++ b/exchange/reprovide/providers.go @@ -0,0 +1,62 @@ +package reprovide + +import ( + "context" + "errors" + "fmt" + + blocks "github.com/ipfs/go-ipfs/blocks/blockstore" + merkledag "github.com/ipfs/go-ipfs/merkledag" + pin "github.com/ipfs/go-ipfs/pin" + + cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid" +) + +func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { + return func(ctx context.Context) (<-chan *cid.Cid, error) { + return bstore.AllKeysChan(ctx) + } +} + +func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) KeyChanFunc { + return func(ctx context.Context) (<-chan *cid.Cid, error) { + set, err := pinSet(ctx, pinning, dag, onlyRoots) + if err != nil { + return nil, err + } + + outCh := make(chan *cid.Cid) + go func() { + set.ForEach(func(c *cid.Cid) error { + select { + case <-ctx.Done(): + return errors.New("context cancelled") + case outCh <- c: + } + return nil + }) + }() + + return outCh, nil + } +} + +func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) (*cid.Set, error) { + set := cid.NewSet() + for _, key := range pinning.DirectKeys() { + set.Add(key) + } + + for _, key := range pinning.RecursiveKeys() { + set.Add(key) + + if !onlyRoots { + err := merkledag.EnumerateChildren(ctx, dag.GetLinks, key, set.Visit) + if err != nil { + return nil, err + } + } + } + + return set, nil +} diff --git a/exchange/reprovide/reprovide.go b/exchange/reprovide/reprovide.go index afa66016f62..3d42c843985 100644 --- a/exchange/reprovide/reprovide.go +++ b/exchange/reprovide/reprovide.go @@ -5,26 +5,27 @@ import ( "fmt" "time" - blocks "github.com/ipfs/go-ipfs/blocks/blockstore" backoff "gx/ipfs/QmPJUtEJsm5YLUWhF6imvyCH8KZXRJa9Wup7FDMwTy5Ufz/backoff" routing "gx/ipfs/QmPjTrrSfE6TzLv6ya6VWhGcCgPrUAdcgrDcQyRDX2VyW1/go-libp2p-routing" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" + cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid" ) var log = logging.Logger("reprovider") +type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error) + type Reprovider struct { // The routing system to provide values through rsys routing.ContentRouting - // The backing store for blocks to be provided - bstore blocks.Blockstore + keyProvider KeyChanFunc } -func NewReprovider(rsys routing.ContentRouting, bstore blocks.Blockstore) *Reprovider { +func NewReprovider(rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { return &Reprovider{ - rsys: rsys, - bstore: bstore, + rsys: rsys, + keyProvider: keyProvider, } } @@ -48,9 +49,9 @@ func (rp *Reprovider) ProvideEvery(ctx context.Context, tick time.Duration) { } func (rp *Reprovider) Reprovide(ctx context.Context) error { - keychan, err := rp.bstore.AllKeysChan(ctx) + keychan, err := rp.keyProvider(ctx) if err != nil { - return fmt.Errorf("Failed to get key chan from blockstore: %s", err) + return fmt.Errorf("Failed to get key chan: %s", err) } for c := range keychan { op := func() error { diff --git a/exchange/reprovide/reprovide_test.go b/exchange/reprovide/reprovide_test.go index 2d755e52606..8a613700f1c 100644 --- a/exchange/reprovide/reprovide_test.go +++ b/exchange/reprovide/reprovide_test.go @@ -32,7 +32,8 @@ func TestReprovide(t *testing.T) { blk := blocks.NewBlock([]byte("this is a test")) bstore.Put(blk) - reprov := NewReprovider(clA, bstore) + keyProvider := NewBlockstoreProvider(bstore) + reprov := NewReprovider(clA, keyProvider) err := reprov.Reprovide(ctx) if err != nil { t.Fatal(err) diff --git a/repo/config/init.go b/repo/config/init.go index aa129d97e12..f31edd42b33 100644 --- a/repo/config/init.go +++ b/repo/config/init.go @@ -72,6 +72,7 @@ func Init(out io.Writer, nBitsForKeypair int) (*Config, error) { }, Reprovider: Reprovider{ Interval: "12h", + Strategy: "all", }, } diff --git a/repo/config/reprovider.go b/repo/config/reprovider.go index 53cf293ab61..fa029c2fc21 100644 --- a/repo/config/reprovider.go +++ b/repo/config/reprovider.go @@ -2,4 +2,5 @@ package config type Reprovider struct { Interval string // Time period to reprovide locally stored objects to the network + Strategy string // Which keys to announce } From bb7aee57291d9e4da198972659b4809f09489003 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 1 Aug 2017 09:55:08 +0200 Subject: [PATCH 02/10] reprovider: Fix build MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- exchange/reprovide/providers.go | 1 - 1 file changed, 1 deletion(-) diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go index 27be34855d8..b96807d371d 100644 --- a/exchange/reprovide/providers.go +++ b/exchange/reprovide/providers.go @@ -3,7 +3,6 @@ package reprovide import ( "context" "errors" - "fmt" blocks "github.com/ipfs/go-ipfs/blocks/blockstore" merkledag "github.com/ipfs/go-ipfs/merkledag" From 17ae331be2be53358445d91f6356185f28465887 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 1 Aug 2017 17:45:15 +0200 Subject: [PATCH 03/10] reprovider: Implement 'bitswap reprovide' command MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/commands/bitswap.go | 36 ++++++++++++++++++++++++--- core/core.go | 4 +-- exchange/reprovide/providers.go | 1 + exchange/reprovide/reprovide.go | 44 ++++++++++++++++++++++++++------- 4 files changed, 70 insertions(+), 15 deletions(-) diff --git a/core/commands/bitswap.go b/core/commands/bitswap.go index 867cb49c232..a769e6b0a7a 100644 --- a/core/commands/bitswap.go +++ b/core/commands/bitswap.go @@ -21,10 +21,11 @@ var BitswapCmd = &cmds.Command{ ShortDescription: ``, }, Subcommands: map[string]*cmds.Command{ - "wantlist": showWantlistCmd, - "stat": bitswapStatCmd, - "unwant": unwantCmd, - "ledger": ledgerCmd, + "wantlist": showWantlistCmd, + "stat": bitswapStatCmd, + "unwant": unwantCmd, + "ledger": ledgerCmd, + "reprovide": reprovideCmd, }, } @@ -242,3 +243,30 @@ prints the ledger associated with a given peer. }, }, } + +var reprovideCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Trigger reprovider.", + ShortDescription: ` +Trigger reprovider to announce our data to network. +`, + }, + Run: func(req cmds.Request, res cmds.Response) { + nd, err := req.InvocContext().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + if !nd.OnlineMode() { + res.SetError(errNotOnline, cmds.ErrClient) + return + } + + err = nd.Reprovider.Trigger(req.Context()) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + }, +} diff --git a/core/core.go b/core/core.go index 33a6b3b5081..faf64ff5dd8 100644 --- a/core/core.go +++ b/core/core.go @@ -277,7 +277,7 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { default: return fmt.Errorf("unknown reprovider strtaegy '%s'", cfg.Reprovider.Strategy) } - n.Reprovider = rp.NewReprovider(n.Routing, keyProvider) + n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider) if cfg.Reprovider.Interval != "0" { interval := kReprovideFrequency @@ -290,7 +290,7 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { interval = dur } - go n.Reprovider.ProvideEvery(ctx, interval) + go n.Reprovider.ProvideEvery(interval) } return nil diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go index b96807d371d..07ddd99ccd0 100644 --- a/exchange/reprovide/providers.go +++ b/exchange/reprovide/providers.go @@ -26,6 +26,7 @@ func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots b outCh := make(chan *cid.Cid) go func() { + defer close(outCh) set.ForEach(func(c *cid.Cid) error { select { case <-ctx.Done(): diff --git a/exchange/reprovide/reprovide.go b/exchange/reprovide/reprovide.go index 3d42c843985..d84d753e04c 100644 --- a/exchange/reprovide/reprovide.go +++ b/exchange/reprovide/reprovide.go @@ -16,35 +16,48 @@ var log = logging.Logger("reprovider") type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error) type Reprovider struct { + ctx context.Context + trigger chan context.CancelFunc + // The routing system to provide values through rsys routing.ContentRouting keyProvider KeyChanFunc } -func NewReprovider(rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { +func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { return &Reprovider{ - rsys: rsys, + ctx: ctx, + trigger: make(chan context.CancelFunc), + + rsys: rsys, keyProvider: keyProvider, } } -func (rp *Reprovider) ProvideEvery(ctx context.Context, tick time.Duration) { +func (rp *Reprovider) ProvideEvery(tick time.Duration) { // dont reprovide immediately. // may have just started the daemon and shutting it down immediately. // probability( up another minute | uptime ) increases with uptime. after := time.After(time.Minute) + var done context.CancelFunc for { select { - case <-ctx.Done(): + case <-rp.ctx.Done(): return + case done = <-rp.trigger: case <-after: - err := rp.Reprovide(ctx) - if err != nil { - log.Debug(err) - } - after = time.After(tick) } + + err := rp.Reprovide(rp.ctx) + if err != nil { + log.Debug(err) + } + + if done != nil { + done() + } + after = time.After(tick) } } @@ -72,3 +85,16 @@ func (rp *Reprovider) Reprovide(ctx context.Context) error { } return nil } + +func (rp *Reprovider) Trigger(ctx context.Context) error { + progressCtx, done := context.WithCancel(ctx) + select { + case <-rp.ctx.Done(): + return context.Canceled + case <-ctx.Done(): + return context.Canceled + case rp.trigger <- done: + <-progressCtx.Done() + return nil + } +} From 279a5606606bb0e7c8816feced68f13bac92865d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 1 Aug 2017 17:46:06 +0200 Subject: [PATCH 04/10] reprovider: strategy tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- test/sharness/t0175-reprovider.sh | 128 ++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100755 test/sharness/t0175-reprovider.sh diff --git a/test/sharness/t0175-reprovider.sh b/test/sharness/t0175-reprovider.sh new file mode 100755 index 00000000000..b4a2cfeb6cc --- /dev/null +++ b/test/sharness/t0175-reprovider.sh @@ -0,0 +1,128 @@ +#!/bin/sh + +test_description="Test reprovider" + +. lib/test-lib.sh + +init_strategy() { + NUM_NODES=2 + test_expect_success 'init iptb' ' + iptb init -f -n $NUM_NODES --bootstrap=none --port=0 + ' + + test_expect_success 'peer ids' ' + PEERID_0=$(iptb get id 0) && + PEERID_1=$(iptb get id 1) + ' + + test_expect_success 'use pinning startegy for reprovider' ' + ipfsi 0 config Reprovider.Strategy '$1' + ' + + test_expect_success 'start peers' ' + iptb start 0 && + iptb start 1 && + iptb connect 0 1 + ' +} + +findprovs_empty() { + test_expect_success 'findprovs succeeds' ' + ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut + ' + + test_expect_success "findprovs output is empty" ' + test_must_be_empty findprovsOut + ' +} + +findprovs_expect() { + test_expect_success 'findprovs succeeds' ' + ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut && + echo '$2' > expected + ' + + test_expect_success "findprovs output looks good" ' + test_cmp findprovsOut expected + ' +} + +reprovide() { + test_expect_success 'reprovide' ' + # TODO: this hangs, though only after reprovision was done + ipfsi 0 bitswap reprovide + ' +} + +test_expect_success 'stop peer 1' ' + iptb stop 1 +' + +# Test 'all' strategy +init_strategy 'all' + +test_expect_success 'add test object' ' + HASH_0=$(echo "foo" | ipfsi 0 add -q --local) +' + +findprovs_empty '$HASH_0' +reprovide +findprovs_expect '$HASH_0' '$PEERID_0' + +# Test 'pinned' strategy +init_strategy 'pinned' + +test_expect_success 'prepare test files' ' + echo foo > f1 && + echo bar > f2 +' + +test_expect_success 'add test objects' ' + HASH_FOO=$(ipfsi 0 add -q --local --pin=false f1) && + HASH_BAR=$(ipfsi 0 add -q --local --pin=false f2) && + HASH_BAR_DIR=$(ipfsi 0 add -q --local -w f2) +' + +findprovs_empty '$HASH_FOO' +findprovs_empty '$HASH_BAR' +findprovs_empty '$HASH_BAR_DIR' + +reprovide + +findprovs_empty '$HASH_FOO' +findprovs_expect '$HASH_BAR' '$PEERID_0' +findprovs_expect '$HASH_BAR_DIR' '$PEERID_0' + +test_expect_success 'stop peer 1' ' + iptb stop 1 +' + +# Test 'roots' strategy +init_strategy 'roots' + +test_expect_success 'prepare test files' ' + echo foo > f1 && + echo bar > f2 +' + +test_expect_success 'add test objects' ' + HASH_FOO=$(ipfsi 0 add -q --local --pin=false f1) && + HASH_BAR=$(ipfsi 0 add -q --local --pin=false f2) && + HASH_BAR_DIR=$(ipfsi 0 add -q --local -w f2) +' + +findprovs_empty '$HASH_FOO' +findprovs_empty '$HASH_BAR' +findprovs_empty '$HASH_BAR_DIR' + +reprovide + +findprovs_empty '$HASH_FOO' +findprovs_empty '$HASH_BAR' +findprovs_expect '$HASH_BAR_DIR' '$PEERID_0' + +test_expect_success 'stop peer 1' ' + iptb stop 1 +' + +test_done From 3917d4dd18d4efd2e06189e07ffe57e7b86662a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 1 Aug 2017 18:21:36 +0200 Subject: [PATCH 05/10] reprovider: Make codeclimate happier MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- exchange/reprovide/providers.go | 6 ++++-- exchange/reprovide/reprovide.go | 21 +++++++++++++-------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go index 07ddd99ccd0..928b979ac2e 100644 --- a/exchange/reprovide/providers.go +++ b/exchange/reprovide/providers.go @@ -11,13 +11,15 @@ import ( cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid" ) -func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { +// NewBlockstoreProvider returns key provider using bstore.AllKeysChan +func NewBlockstoreProvider(bstore blocks.Blockstore) keyChanFunc { return func(ctx context.Context) (<-chan *cid.Cid, error) { return bstore.AllKeysChan(ctx) } } -func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) KeyChanFunc { +// NewPinnedProvider returns provider supplying pinned keys +func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) keyChanFunc { return func(ctx context.Context) (<-chan *cid.Cid, error) { set, err := pinSet(ctx, pinning, dag, onlyRoots) if err != nil { diff --git a/exchange/reprovide/reprovide.go b/exchange/reprovide/reprovide.go index d84d753e04c..852eb512da9 100644 --- a/exchange/reprovide/reprovide.go +++ b/exchange/reprovide/reprovide.go @@ -13,7 +13,7 @@ import ( var log = logging.Logger("reprovider") -type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error) +type keyChanFunc func(context.Context) (<-chan *cid.Cid, error) type Reprovider struct { ctx context.Context @@ -22,19 +22,21 @@ type Reprovider struct { // The routing system to provide values through rsys routing.ContentRouting - keyProvider KeyChanFunc + keyProvider keyChanFunc } -func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { +// NewReprovider creates new Reprovider instance. +func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider keyChanFunc) *Reprovider { return &Reprovider{ ctx: ctx, trigger: make(chan context.CancelFunc), - rsys: rsys, + rsys: rsys, keyProvider: keyProvider, } } +// ProvideEvery re-provides keys with 'tick' interval func (rp *Reprovider) ProvideEvery(tick time.Duration) { // dont reprovide immediately. // may have just started the daemon and shutting it down immediately. @@ -49,7 +51,7 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) { case <-after: } - err := rp.Reprovide(rp.ctx) + err := rp.Reprovide() if err != nil { log.Debug(err) } @@ -61,14 +63,15 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) { } } -func (rp *Reprovider) Reprovide(ctx context.Context) error { - keychan, err := rp.keyProvider(ctx) +// Reprovide registers all keys given by rp.keyProvider to libp2p content routing +func (rp *Reprovider) Reprovide() error { + keychan, err := rp.keyProvider(rp.ctx) if err != nil { return fmt.Errorf("Failed to get key chan: %s", err) } for c := range keychan { op := func() error { - err := rp.rsys.Provide(ctx, c, true) + err := rp.rsys.Provide(rp.ctx, c, true) if err != nil { log.Debugf("Failed to provide key: %s", err) } @@ -86,8 +89,10 @@ func (rp *Reprovider) Reprovide(ctx context.Context) error { return nil } +// Trigger starts reprovision process in rp.ProvideEvery and waits for it func (rp *Reprovider) Trigger(ctx context.Context) error { progressCtx, done := context.WithCancel(ctx) + select { case <-rp.ctx.Done(): return context.Canceled From 7e5e0cba032682bfe413c56f0c0ad7b1c417b055 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 3 Aug 2017 10:23:43 +0200 Subject: [PATCH 06/10] reprovider: Fix sharness tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- exchange/reprovide/reprovide_test.go | 4 ++-- test/sharness/t0175-reprovider.sh | 23 +++++++++++------------ 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/exchange/reprovide/reprovide_test.go b/exchange/reprovide/reprovide_test.go index 8a613700f1c..af5e0d880aa 100644 --- a/exchange/reprovide/reprovide_test.go +++ b/exchange/reprovide/reprovide_test.go @@ -33,8 +33,8 @@ func TestReprovide(t *testing.T) { bstore.Put(blk) keyProvider := NewBlockstoreProvider(bstore) - reprov := NewReprovider(clA, keyProvider) - err := reprov.Reprovide(ctx) + reprov := NewReprovider(ctx, clA, keyProvider) + err := reprov.Reprovide() if err != nil { t.Fatal(err) } diff --git a/test/sharness/t0175-reprovider.sh b/test/sharness/t0175-reprovider.sh index b4a2cfeb6cc..986c6202ee2 100755 --- a/test/sharness/t0175-reprovider.sh +++ b/test/sharness/t0175-reprovider.sh @@ -5,7 +5,7 @@ test_description="Test reprovider" . lib/test-lib.sh init_strategy() { - NUM_NODES=2 + NUM_NODES=6 test_expect_success 'init iptb' ' iptb init -f -n $NUM_NODES --bootstrap=none --port=0 ' @@ -19,30 +19,26 @@ init_strategy() { ipfsi 0 config Reprovider.Strategy '$1' ' - test_expect_success 'start peers' ' - iptb start 0 && - iptb start 1 && - iptb connect 0 1 - ' + startup_cluster 6 --debug } findprovs_empty() { - test_expect_success 'findprovs succeeds' ' + test_expect_success 'findprovs '$1' succeeds' ' ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut ' - test_expect_success "findprovs output is empty" ' + test_expect_success "findprovs $1 output is empty" ' test_must_be_empty findprovsOut ' } findprovs_expect() { - test_expect_success 'findprovs succeeds' ' + test_expect_success 'findprovs '$1' succeeds' ' ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut && echo '$2' > expected ' - test_expect_success "findprovs output looks good" ' + test_expect_success "findprovs $1 output looks good" ' test_cmp findprovsOut expected ' } @@ -102,13 +98,15 @@ init_strategy 'roots' test_expect_success 'prepare test files' ' echo foo > f1 && - echo bar > f2 + echo bar > f2 && + echo baz > f3 ' test_expect_success 'add test objects' ' HASH_FOO=$(ipfsi 0 add -q --local --pin=false f1) && HASH_BAR=$(ipfsi 0 add -q --local --pin=false f2) && - HASH_BAR_DIR=$(ipfsi 0 add -q --local -w f2) + HASH_BAZ=$(ipfsi 0 add -q --local f3) && + HASH_BAR_DIR=$(ipfsi 0 add -q --local -w f2 | tail -1) ' findprovs_empty '$HASH_FOO' @@ -119,6 +117,7 @@ reprovide findprovs_empty '$HASH_FOO' findprovs_empty '$HASH_BAR' +findprovs_expect '$HASH_BAZ' '$PEERID_0' findprovs_expect '$HASH_BAR_DIR' '$PEERID_0' test_expect_success 'stop peer 1' ' From 0f692baffaf6f0de934d8f239d70af307a0ff524 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 3 Aug 2017 14:51:05 +0200 Subject: [PATCH 07/10] reprovider: reduce pinned strategy i/o overhead MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- exchange/reprovide/providers.go | 69 +++++++++++++++++++++++++-------- 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go index 928b979ac2e..ca8ae5d8146 100644 --- a/exchange/reprovide/providers.go +++ b/exchange/reprovide/providers.go @@ -2,7 +2,6 @@ package reprovide import ( "context" - "errors" blocks "github.com/ipfs/go-ipfs/blocks/blockstore" merkledag "github.com/ipfs/go-ipfs/merkledag" @@ -29,36 +28,72 @@ func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots b outCh := make(chan *cid.Cid) go func() { defer close(outCh) - set.ForEach(func(c *cid.Cid) error { + for c := range set.new { select { case <-ctx.Done(): - return errors.New("context cancelled") + return case outCh <- c: } - return nil - }) + } + }() return outCh, nil } } -func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) (*cid.Set, error) { - set := cid.NewSet() - for _, key := range pinning.DirectKeys() { - set.Add(key) - } +func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) (*streamingSet, error) { + set := newStreamingSet() + + go func() { + for _, key := range pinning.DirectKeys() { + set.add(key) + } - for _, key := range pinning.RecursiveKeys() { - set.Add(key) + for _, key := range pinning.RecursiveKeys() { + set.add(key) - if !onlyRoots { - err := merkledag.EnumerateChildren(ctx, dag.GetLinks, key, set.Visit) - if err != nil { - return nil, err + if !onlyRoots { + err := merkledag.EnumerateChildren(ctx, dag.GetLinks, key, set.add) + if err != nil { + return //TODO: propagate to chan / log? + } } } - } + + close(set.new) + }() return set, nil } + +type streamingSet struct { + set map[string]struct{} + new chan *cid.Cid +} + +// NewSet initializes and returns a new Set. +func newStreamingSet() *streamingSet { + return &streamingSet{ + set: make(map[string]struct{}), + new: make(chan *cid.Cid), + } +} + +// has returns if the Set contains a given Cid. +func (s *streamingSet) has(c *cid.Cid) bool { + _, ok := s.set[string(c.Bytes())] + return ok +} + +// add adds a Cid to the set only if it is +// not in it already. +func (s *streamingSet) add(c *cid.Cid) bool { + if !s.has(c) { + s.set[string(c.Bytes())] = struct{}{} + s.new <- c + return true + } + + return false +} From a865fde21d2fd3c5a763c0d70bac9bb190296f8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 3 Aug 2017 15:13:30 +0200 Subject: [PATCH 08/10] reprovider: make reprovide cmd error if reprovider is active MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- exchange/reprovide/reprovide.go | 41 ++++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/exchange/reprovide/reprovide.go b/exchange/reprovide/reprovide.go index 852eb512da9..c779245abde 100644 --- a/exchange/reprovide/reprovide.go +++ b/exchange/reprovide/reprovide.go @@ -14,10 +14,11 @@ import ( var log = logging.Logger("reprovider") type keyChanFunc func(context.Context) (<-chan *cid.Cid, error) +type doneFunc func(error) type Reprovider struct { ctx context.Context - trigger chan context.CancelFunc + trigger chan doneFunc // The routing system to provide values through rsys routing.ContentRouting @@ -29,7 +30,7 @@ type Reprovider struct { func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider keyChanFunc) *Reprovider { return &Reprovider{ ctx: ctx, - trigger: make(chan context.CancelFunc), + trigger: make(chan doneFunc), rsys: rsys, keyProvider: keyProvider, @@ -42,7 +43,7 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) { // may have just started the daemon and shutting it down immediately. // probability( up another minute | uptime ) increases with uptime. after := time.After(time.Minute) - var done context.CancelFunc + var done doneFunc for { select { case <-rp.ctx.Done(): @@ -51,14 +52,19 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) { case <-after: } + unmute := rp.muteTrigger() + err := rp.Reprovide() if err != nil { log.Debug(err) } if done != nil { - done() + done(err) } + + unmute() + after = time.After(tick) } } @@ -93,13 +99,36 @@ func (rp *Reprovider) Reprovide() error { func (rp *Reprovider) Trigger(ctx context.Context) error { progressCtx, done := context.WithCancel(ctx) + var err error + df := func(e error) { + err = e + done() + } + select { case <-rp.ctx.Done(): return context.Canceled case <-ctx.Done(): return context.Canceled - case rp.trigger <- done: + case rp.trigger <- df: <-progressCtx.Done() - return nil + return err } } + +func (rp *Reprovider) muteTrigger() context.CancelFunc { + ctx, cf := context.WithCancel(rp.ctx) + go func() { + defer cf() + for { + select { + case <-ctx.Done(): + return + case done := <-rp.trigger: + done(fmt.Errorf("reprovider is already running")) + } + } + }() + + return cf +} From 048debeffd9fec6a899c1a38a3b5784224820647 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 4 Aug 2017 23:56:12 +0200 Subject: [PATCH 09/10] reprovider: strategy docs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- docs/config.md | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/docs/config.md b/docs/config.md index c922fc3983d..02071e821b0 100644 --- a/docs/config.md +++ b/docs/config.md @@ -15,7 +15,7 @@ a running daemon do not read the config file at runtime. - [`Identity`](#identity) - [`Ipns`](#ipns) - [`Mounts`](#mounts) -- [`ReproviderInterval`](#reproviderinterval) +- [`Reproviderl`](#reprovider) - [`SupernodeRouting`](#supernoderouting) - [`Swarm`](#swarm) - [`Tour`](#tour) @@ -193,7 +193,9 @@ Mountpoint for `/ipns/`. - `FuseAllowOther` Sets the FUSE allow other option on the mountpoint. -## `ReproviderInterval` +## `Reprovider` + +- `Interval` Sets the time between rounds of reproviding local content to the routing system. If unset, it defaults to 12 hours. If set to the value `"0"` it will disable content reproviding. @@ -203,6 +205,12 @@ not being able to discover that you have the objects that you have. If you want to have this disabled and keep the network aware of what you have, you must manually announce your content periodically. +- `Strategy` +Tells reprovider what should be announced. Valid strategies are: + - "all" (default) - announce all stored data + - "pinned" - only announce pinned data + - "roots" - only announce directly pinned keys and root keys of recursive pins + ## `SupernodeRouting` Deprecated. From 4a5b93a1df25dd6694f514a60a0c1e94ba10a74d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 15 Aug 2017 23:03:50 +0200 Subject: [PATCH 10/10] reprovider: apply review suggestions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/core.go | 4 ++-- docs/config.md | 2 +- exchange/reprovide/providers.go | 24 +++++++++--------------- exchange/reprovide/reprovide.go | 9 ++++++--- 4 files changed, 18 insertions(+), 21 deletions(-) diff --git a/core/core.go b/core/core.go index faf64ff5dd8..28ec138310a 100644 --- a/core/core.go +++ b/core/core.go @@ -263,7 +263,7 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { return err } - var keyProvider func(context.Context) (<-chan *cid.Cid, error) + var keyProvider rp.KeyChanFunc switch cfg.Reprovider.Strategy { case "all": @@ -275,7 +275,7 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { case "pinned": keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false) default: - return fmt.Errorf("unknown reprovider strtaegy '%s'", cfg.Reprovider.Strategy) + return fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) } n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider) diff --git a/docs/config.md b/docs/config.md index 02071e821b0..815bcf416b3 100644 --- a/docs/config.md +++ b/docs/config.md @@ -15,7 +15,7 @@ a running daemon do not read the config file at runtime. - [`Identity`](#identity) - [`Ipns`](#ipns) - [`Mounts`](#mounts) -- [`Reproviderl`](#reprovider) +- [`Reprovider`](#reprovider) - [`SupernodeRouting`](#supernoderouting) - [`Swarm`](#swarm) - [`Tour`](#tour) diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go index ca8ae5d8146..5335e17651c 100644 --- a/exchange/reprovide/providers.go +++ b/exchange/reprovide/providers.go @@ -11,14 +11,14 @@ import ( ) // NewBlockstoreProvider returns key provider using bstore.AllKeysChan -func NewBlockstoreProvider(bstore blocks.Blockstore) keyChanFunc { +func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { return func(ctx context.Context) (<-chan *cid.Cid, error) { return bstore.AllKeysChan(ctx) } } // NewPinnedProvider returns provider supplying pinned keys -func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) keyChanFunc { +func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) KeyChanFunc { return func(ctx context.Context) (<-chan *cid.Cid, error) { set, err := pinSet(ctx, pinning, dag, onlyRoots) if err != nil { @@ -46,6 +46,8 @@ func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, o set := newStreamingSet() go func() { + defer close(set.new) + for _, key := range pinning.DirectKeys() { set.add(key) } @@ -56,41 +58,33 @@ func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, o if !onlyRoots { err := merkledag.EnumerateChildren(ctx, dag.GetLinks, key, set.add) if err != nil { - return //TODO: propagate to chan / log? + log.Errorf("reprovide indirect pins: %s", err) + return } } } - - close(set.new) }() return set, nil } type streamingSet struct { - set map[string]struct{} + set *cid.Set new chan *cid.Cid } // NewSet initializes and returns a new Set. func newStreamingSet() *streamingSet { return &streamingSet{ - set: make(map[string]struct{}), + set: cid.NewSet(), new: make(chan *cid.Cid), } } -// has returns if the Set contains a given Cid. -func (s *streamingSet) has(c *cid.Cid) bool { - _, ok := s.set[string(c.Bytes())] - return ok -} - // add adds a Cid to the set only if it is // not in it already. func (s *streamingSet) add(c *cid.Cid) bool { - if !s.has(c) { - s.set[string(c.Bytes())] = struct{}{} + if s.set.Visit(c) { s.new <- c return true } diff --git a/exchange/reprovide/reprovide.go b/exchange/reprovide/reprovide.go index c779245abde..a56220c6e34 100644 --- a/exchange/reprovide/reprovide.go +++ b/exchange/reprovide/reprovide.go @@ -13,7 +13,8 @@ import ( var log = logging.Logger("reprovider") -type keyChanFunc func(context.Context) (<-chan *cid.Cid, error) +//KeyChanFunc is function streaming CIDs to pass to content routing +type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error) type doneFunc func(error) type Reprovider struct { @@ -23,11 +24,11 @@ type Reprovider struct { // The routing system to provide values through rsys routing.ContentRouting - keyProvider keyChanFunc + keyProvider KeyChanFunc } // NewReprovider creates new Reprovider instance. -func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider keyChanFunc) *Reprovider { +func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { return &Reprovider{ ctx: ctx, trigger: make(chan doneFunc), @@ -52,6 +53,8 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) { case <-after: } + //'mute' the trigger channel so when `ipfs bitswap reprovide` is called + //a 'reprovider is already running' error is returned unmute := rp.muteTrigger() err := rp.Reprovide()