From da8a30f21860fd9080abc6c18a7a13d88f5527dd Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 20 Apr 2020 15:13:07 +0100 Subject: [PATCH 1/3] feat: set provider manager options --- dht.go | 6 ++- dht_options.go | 10 +++++ providers/providers_manager.go | 65 ++++++++++++++++++++++++----- providers/providers_manager_test.go | 30 ++++++++++--- 4 files changed, 93 insertions(+), 18 deletions(-) diff --git a/dht.go b/dht.go index 7bfb027b9..7abb42370 100644 --- a/dht.go +++ b/dht.go @@ -275,7 +275,11 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) { // the DHT context should be done when the process is closed dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc) - dht.ProviderManager = providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore) + pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore, cfg.providersOptions...) + if err != nil { + return nil, err + } + dht.ProviderManager = pm return dht, nil } diff --git a/dht_options.go b/dht_options.go index 3637c2136..3b9533e04 100644 --- a/dht_options.go +++ b/dht_options.go @@ -11,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + "github.com/libp2p/go-libp2p-kad-dht/providers" record "github.com/libp2p/go-libp2p-record" ) @@ -45,6 +46,7 @@ type config struct { maxRecordAge time.Duration enableProviders bool enableValues bool + providersOptions []providers.Option queryPeerFilter QueryFilterFunc routingTable struct { @@ -348,6 +350,14 @@ func DisableValues() Option { } } +// ProvidersOptions are options passed directly to the provider manager. +func ProvidersOptions(opts []providers.Option) Option { + return func(c *config) error { + c.providersOptions = opts + return nil + } +} + // QueryFilter sets a function that approves which peers may be dialed in a query func QueryFilter(filter QueryFilterFunc) Option { return func(c *config) error { diff --git a/providers/providers_manager.go b/providers/providers_manager.go index 2963f4cfb..fc324e087 100644 --- a/providers/providers_manager.go +++ b/providers/providers_manager.go @@ -35,7 +35,7 @@ var log = logging.Logger("providers") type ProviderManager struct { // all non channel fields are meant to be accessed only within // the run method - cache *lru.LRU + cache lru.LRUCache dstore *autobatch.Datastore newprovs chan *addProv @@ -45,6 +45,51 @@ type ProviderManager struct { cleanupInterval time.Duration } +type options struct { + cleanupInterval time.Duration + cache lru.LRUCache +} + +// Option is a function that sets a provider manager option. +type Option func(*options) error + +func (c *options) apply(opts ...Option) error { + for i, opt := range opts { + if err := opt(c); err != nil { + return fmt.Errorf("provider manager option %d failed: %s", i, err) + } + } + return nil +} + +var defaults = func(o *options) error { + o.cleanupInterval = defaultCleanupInterval + cache, err := lru.NewLRU(lruCacheSize, nil) + if err != nil { + return err + } + o.cache = cache + return nil +} + +// CleanupInterval sets the time between GC runs. +// Defaults to 1h. +func CleanupInterval(d time.Duration) Option { + return func(o *options) error { + o.cleanupInterval = d + return nil + } +} + +// Cache sets the LRU cache implementation. +// Defaults to a simple LRU cache. +func Cache(c lru.LRUCache) Option { + return func(o *options) error { + o.cache = c + return nil + } +} + type addProv struct { key []byte val peer.ID @@ -56,22 +101,20 @@ type getProv struct { } // NewProviderManager constructor -func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) *ProviderManager { +func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) { + var options options + if err := options.apply(append([]Option{defaults}, opts...)...); err != nil { + return nil, err + } pm := new(ProviderManager) pm.getprovs = make(chan *getProv) pm.newprovs = make(chan *addProv) pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize) - cache, err := lru.NewLRU(lruCacheSize, nil) - if err != nil { - panic(err) //only happens if negative value is passed to lru constructor - } - pm.cache = cache - + pm.cache = options.cache pm.proc = goprocessctx.WithContext(ctx) - pm.cleanupInterval = defaultCleanupInterval + pm.cleanupInterval = options.cleanupInterval pm.proc.Go(pm.run) - - return pm + return pm, nil } // Process returns the ProviderManager process diff --git a/providers/providers_manager_test.go b/providers/providers_manager_test.go index 108ee8712..e7e385b22 100644 --- a/providers/providers_manager_test.go +++ b/providers/providers_manager_test.go @@ -26,7 +26,10 @@ func TestProviderManager(t *testing.T) { defer cancel() mid := peer.ID("testing") - p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore())) + p, err := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore())) + if err != nil { + t.Fatal(err) + } a := u.Hash([]byte("test")) p.AddProvider(ctx, a, peer.ID("testingprovider")) @@ -64,7 +67,10 @@ func TestProvidersDatastore(t *testing.T) { defer cancel() mid := peer.ID("testing") - p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore())) + p, err := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore())) + if err != nil { + t.Fatal(err) + } defer p.proc.Close() friend := peer.ID("friend") @@ -144,7 +150,10 @@ func TestProvidesExpire(t *testing.T) { ds := dssync.MutexWrap(ds.NewMapDatastore()) mid := peer.ID("testing") - p := NewProviderManager(ctx, mid, ds) + p, err := NewProviderManager(ctx, mid, ds) + if err != nil { + t.Fatal(err) + } peers := []peer.ID{"a", "b"} var mhs []mh.Multihash @@ -249,7 +258,10 @@ func TestLargeProvidersSet(t *testing.T) { } mid := peer.ID("myself") - p := NewProviderManager(ctx, mid, dstore) + p, err := NewProviderManager(ctx, mid, dstore) + if err != nil { + t.Fatal(err) + } defer p.proc.Close() var mhs []mh.Multihash @@ -281,7 +293,10 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) { p1, p2 := peer.ID("a"), peer.ID("b") h1 := u.Hash([]byte("1")) h2 := u.Hash([]byte("2")) - pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore())) + pm, err := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore())) + if err != nil { + t.Fatal(err) + } // add provider pm.AddProvider(ctx, h1, p1) @@ -302,7 +317,10 @@ func TestWriteUpdatesCache(t *testing.T) { p1, p2 := peer.ID("a"), peer.ID("b") h1 := u.Hash([]byte("1")) - pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore())) + pm, err := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore())) + if err != nil { + t.Fatal(err) + } // add provider pm.AddProvider(ctx, h1, p1) From e01dc8e3f07bf19540364b5fc1ab8304ebd46388 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 20 Apr 2020 15:14:11 +0100 Subject: [PATCH 2/3] refactor: rename to cfg --- providers/providers_manager.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/providers/providers_manager.go b/providers/providers_manager.go index fc324e087..1c14c6da5 100644 --- a/providers/providers_manager.go +++ b/providers/providers_manager.go @@ -102,17 +102,17 @@ type getProv struct { // NewProviderManager constructor func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) { - var options options - if err := options.apply(append([]Option{defaults}, opts...)...); err != nil { + var cfg options + if err := cfg.apply(append([]Option{defaults}, opts...)...); err != nil { return nil, err } pm := new(ProviderManager) pm.getprovs = make(chan *getProv) pm.newprovs = make(chan *addProv) pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize) - pm.cache = options.cache + pm.cache = cfg.cache pm.proc = goprocessctx.WithContext(ctx) - pm.cleanupInterval = options.cleanupInterval + pm.cleanupInterval = cfg.cleanupInterval pm.proc.Go(pm.run) return pm, nil } From 1d696d1ea20168351b6065c317ebf0dbb8be9b53 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 21 Apr 2020 09:53:13 +0100 Subject: [PATCH 3/3] refactor: implement feedback from review --- dht_options.go | 4 ++++ providers/providers_manager.go | 44 +++++++++++++--------------------- 2 files changed, 20 insertions(+), 28 deletions(-) diff --git a/dht_options.go b/dht_options.go index 3b9533e04..bd3523d67 100644 --- a/dht_options.go +++ b/dht_options.go @@ -351,6 +351,10 @@ func DisableValues() Option { } // ProvidersOptions are options passed directly to the provider manager. +// +// The provider manager adds and gets provider records from the datastore, cahing +// them in between. These options are passed to the provider manager allowing +// customisation of things like the GC interval and cache implementation. func ProvidersOptions(opts []providers.Option) Option { return func(c *config) error { c.providersOptions = opts diff --git a/providers/providers_manager.go b/providers/providers_manager.go index 1c14c6da5..20927d2e8 100644 --- a/providers/providers_manager.go +++ b/providers/providers_manager.go @@ -45,38 +45,23 @@ type ProviderManager struct { cleanupInterval time.Duration } -type options struct { - cleanupInterval time.Duration - cache lru.LRUCache -} - // Option is a function that sets a provider manager option. -type Option func(*options) error +type Option func(*ProviderManager) error -func (c *options) apply(opts ...Option) error { +func (pm *ProviderManager) applyOptions(opts ...Option) error { for i, opt := range opts { - if err := opt(c); err != nil { + if err := opt(pm); err != nil { return fmt.Errorf("provider manager option %d failed: %s", i, err) } } return nil } -var defaults = func(o *options) error { - o.cleanupInterval = defaultCleanupInterval - cache, err := lru.NewLRU(lruCacheSize, nil) - if err != nil { - return err - } - o.cache = cache - return nil -} - // CleanupInterval sets the time between GC runs. // Defaults to 1h. func CleanupInterval(d time.Duration) Option { - return func(o *options) error { - o.cleanupInterval = d + return func(pm *ProviderManager) error { + pm.cleanupInterval = d return nil } } @@ -84,8 +69,8 @@ func CleanupInterval(d time.Duration) Option { // Cache sets the LRU cache implementation. // Defaults to a simple LRU cache. func Cache(c lru.LRUCache) Option { - return func(o *options) error { - o.cache = c + return func(pm *ProviderManager) error { + pm.cache = c return nil } } @@ -102,17 +87,20 @@ type getProv struct { // NewProviderManager constructor func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) { - var cfg options - if err := cfg.apply(append([]Option{defaults}, opts...)...); err != nil { - return nil, err - } pm := new(ProviderManager) pm.getprovs = make(chan *getProv) pm.newprovs = make(chan *addProv) pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize) - pm.cache = cfg.cache + cache, err := lru.NewLRU(lruCacheSize, nil) + if err != nil { + return nil, err + } + pm.cache = cache + pm.cleanupInterval = defaultCleanupInterval + if err := pm.applyOptions(opts...); err != nil { + return nil, err + } pm.proc = goprocessctx.WithContext(ctx) - pm.cleanupInterval = cfg.cleanupInterval pm.proc.Go(pm.run) return pm, nil }