Skip to content

Commit

Permalink
Merge pull request #593 from alanshaw/feat/set-provider-mgr-options
Browse files Browse the repository at this point in the history
feat: set provider manager options
  • Loading branch information
Stebalien authored Apr 21, 2020
2 parents 21b38cc + 1d696d1 commit 523a9b8
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 14 deletions.
6 changes: 5 additions & 1 deletion dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,11 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
// the DHT context should be done when the process is closed
dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

dht.ProviderManager = providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore)
pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore, cfg.providersOptions...)
if err != nil {
return nil, err
}
dht.ProviderManager = pm

return dht, nil
}
Expand Down
14 changes: 14 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-kad-dht/providers"
record "github.com/libp2p/go-libp2p-record"
)

Expand Down Expand Up @@ -45,6 +46,7 @@ type config struct {
maxRecordAge time.Duration
enableProviders bool
enableValues bool
providersOptions []providers.Option
queryPeerFilter QueryFilterFunc

routingTable struct {
Expand Down Expand Up @@ -348,6 +350,18 @@ func DisableValues() Option {
}
}

// ProvidersOptions are options passed directly to the provider manager.
//
// The provider manager adds and gets provider records from the datastore, cahing
// them in between. These options are passed to the provider manager allowing
// customisation of things like the GC interval and cache implementation.
func ProvidersOptions(opts []providers.Option) Option {
return func(c *config) error {
c.providersOptions = opts
return nil
}
}

// QueryFilter sets a function that approves which peers may be dialed in a query
func QueryFilter(filter QueryFilterFunc) Option {
return func(c *config) error {
Expand Down
45 changes: 38 additions & 7 deletions providers/providers_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var log = logging.Logger("providers")
type ProviderManager struct {
// all non channel fields are meant to be accessed only within
// the run method
cache *lru.LRU
cache lru.LRUCache
dstore *autobatch.Datastore

newprovs chan *addProv
Expand All @@ -45,6 +45,36 @@ type ProviderManager struct {
cleanupInterval time.Duration
}

// Option is a function that sets a provider manager option.
type Option func(*ProviderManager) error

func (pm *ProviderManager) applyOptions(opts ...Option) error {
for i, opt := range opts {
if err := opt(pm); err != nil {
return fmt.Errorf("provider manager option %d failed: %s", i, err)
}
}
return nil
}

// CleanupInterval sets the time between GC runs.
// Defaults to 1h.
func CleanupInterval(d time.Duration) Option {
return func(pm *ProviderManager) error {
pm.cleanupInterval = d
return nil
}
}

// Cache sets the LRU cache implementation.
// Defaults to a simple LRU cache.
func Cache(c lru.LRUCache) Option {
return func(pm *ProviderManager) error {
pm.cache = c
return nil
}
}

type addProv struct {
key []byte
val peer.ID
Expand All @@ -56,22 +86,23 @@ type getProv struct {
}

// NewProviderManager constructor
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) *ProviderManager {
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
pm := new(ProviderManager)
pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv)
pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize)
cache, err := lru.NewLRU(lruCacheSize, nil)
if err != nil {
panic(err) //only happens if negative value is passed to lru constructor
return nil, err
}
pm.cache = cache

pm.proc = goprocessctx.WithContext(ctx)
pm.cleanupInterval = defaultCleanupInterval
if err := pm.applyOptions(opts...); err != nil {
return nil, err
}
pm.proc = goprocessctx.WithContext(ctx)
pm.proc.Go(pm.run)

return pm
return pm, nil
}

// Process returns the ProviderManager process
Expand Down
30 changes: 24 additions & 6 deletions providers/providers_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ func TestProviderManager(t *testing.T) {
defer cancel()

mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
p, err := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
if err != nil {
t.Fatal(err)
}
a := u.Hash([]byte("test"))
p.AddProvider(ctx, a, peer.ID("testingprovider"))

Expand Down Expand Up @@ -64,7 +67,10 @@ func TestProvidersDatastore(t *testing.T) {
defer cancel()

mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
p, err := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
if err != nil {
t.Fatal(err)
}
defer p.proc.Close()

friend := peer.ID("friend")
Expand Down Expand Up @@ -144,7 +150,10 @@ func TestProvidesExpire(t *testing.T) {

ds := dssync.MutexWrap(ds.NewMapDatastore())
mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, ds)
p, err := NewProviderManager(ctx, mid, ds)
if err != nil {
t.Fatal(err)
}

peers := []peer.ID{"a", "b"}
var mhs []mh.Multihash
Expand Down Expand Up @@ -249,7 +258,10 @@ func TestLargeProvidersSet(t *testing.T) {
}

mid := peer.ID("myself")
p := NewProviderManager(ctx, mid, dstore)
p, err := NewProviderManager(ctx, mid, dstore)
if err != nil {
t.Fatal(err)
}
defer p.proc.Close()

var mhs []mh.Multihash
Expand Down Expand Up @@ -281,7 +293,10 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) {
p1, p2 := peer.ID("a"), peer.ID("b")
h1 := u.Hash([]byte("1"))
h2 := u.Hash([]byte("2"))
pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))
pm, err := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))
if err != nil {
t.Fatal(err)
}

// add provider
pm.AddProvider(ctx, h1, p1)
Expand All @@ -302,7 +317,10 @@ func TestWriteUpdatesCache(t *testing.T) {

p1, p2 := peer.ID("a"), peer.ID("b")
h1 := u.Hash([]byte("1"))
pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))
pm, err := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))
if err != nil {
t.Fatal(err)
}

// add provider
pm.AddProvider(ctx, h1, p1)
Expand Down

0 comments on commit 523a9b8

Please sign in to comment.