Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: set provider manager options #593

Merged
merged 3 commits into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,11 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
// the DHT context should be done when the process is closed
dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

dht.ProviderManager = providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore)
pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore, cfg.providersOptions...)
if err != nil {
return nil, err
}
dht.ProviderManager = pm

return dht, nil
}
Expand Down
10 changes: 10 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-kad-dht/providers"
record "github.com/libp2p/go-libp2p-record"
)

Expand Down Expand Up @@ -45,6 +46,7 @@ type config struct {
maxRecordAge time.Duration
enableProviders bool
enableValues bool
providersOptions []providers.Option
queryPeerFilter QueryFilterFunc

routingTable struct {
Expand Down Expand Up @@ -348,6 +350,14 @@ func DisableValues() Option {
}
}

// ProvidersOptions are options passed directly to the provider manager.
func ProvidersOptions(opts []providers.Option) Option {
alanshaw marked this conversation as resolved.
Show resolved Hide resolved
return func(c *config) error {
c.providersOptions = opts
return nil
}
}

// QueryFilter sets a function that approves which peers may be dialed in a query
func QueryFilter(filter QueryFilterFunc) Option {
return func(c *config) error {
Expand Down
65 changes: 54 additions & 11 deletions providers/providers_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var log = logging.Logger("providers")
type ProviderManager struct {
// all non channel fields are meant to be accessed only within
// the run method
cache *lru.LRU
cache lru.LRUCache
dstore *autobatch.Datastore

newprovs chan *addProv
Expand All @@ -45,6 +45,51 @@ type ProviderManager struct {
cleanupInterval time.Duration
}

type options struct {
cleanupInterval time.Duration
cache lru.LRUCache
}

// Option is a function that sets a provider manager option.
type Option func(*options) error
alanshaw marked this conversation as resolved.
Show resolved Hide resolved

func (c *options) apply(opts ...Option) error {
for i, opt := range opts {
if err := opt(c); err != nil {
return fmt.Errorf("provider manager option %d failed: %s", i, err)
}
}
return nil
}

var defaults = func(o *options) error {
o.cleanupInterval = defaultCleanupInterval
cache, err := lru.NewLRU(lruCacheSize, nil)
if err != nil {
return err
}
o.cache = cache
return nil
}

// CleanupInterval sets the time between GC runs.
// Defaults to 1h.
func CleanupInterval(d time.Duration) Option {
return func(o *options) error {
o.cleanupInterval = d
return nil
}
}

// Cache sets the LRU cache implementation.
// Defaults to a simple LRU cache.
func Cache(c lru.LRUCache) Option {
return func(o *options) error {
o.cache = c
return nil
}
}

type addProv struct {
key []byte
val peer.ID
Expand All @@ -56,22 +101,20 @@ type getProv struct {
}

// NewProviderManager constructor
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) *ProviderManager {
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
var cfg options
if err := cfg.apply(append([]Option{defaults}, opts...)...); err != nil {
return nil, err
}
pm := new(ProviderManager)
pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv)
pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize)
cache, err := lru.NewLRU(lruCacheSize, nil)
if err != nil {
panic(err) //only happens if negative value is passed to lru constructor
}
pm.cache = cache

pm.cache = cfg.cache
pm.proc = goprocessctx.WithContext(ctx)
pm.cleanupInterval = defaultCleanupInterval
pm.cleanupInterval = cfg.cleanupInterval
pm.proc.Go(pm.run)

return pm
return pm, nil
}

// Process returns the ProviderManager process
Expand Down
30 changes: 24 additions & 6 deletions providers/providers_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ func TestProviderManager(t *testing.T) {
defer cancel()

mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
p, err := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
if err != nil {
t.Fatal(err)
}
a := u.Hash([]byte("test"))
p.AddProvider(ctx, a, peer.ID("testingprovider"))

Expand Down Expand Up @@ -64,7 +67,10 @@ func TestProvidersDatastore(t *testing.T) {
defer cancel()

mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
p, err := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
if err != nil {
t.Fatal(err)
}
defer p.proc.Close()

friend := peer.ID("friend")
Expand Down Expand Up @@ -144,7 +150,10 @@ func TestProvidesExpire(t *testing.T) {

ds := dssync.MutexWrap(ds.NewMapDatastore())
mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, ds)
p, err := NewProviderManager(ctx, mid, ds)
if err != nil {
t.Fatal(err)
}

peers := []peer.ID{"a", "b"}
var mhs []mh.Multihash
Expand Down Expand Up @@ -249,7 +258,10 @@ func TestLargeProvidersSet(t *testing.T) {
}

mid := peer.ID("myself")
p := NewProviderManager(ctx, mid, dstore)
p, err := NewProviderManager(ctx, mid, dstore)
if err != nil {
t.Fatal(err)
}
defer p.proc.Close()

var mhs []mh.Multihash
Expand Down Expand Up @@ -281,7 +293,10 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) {
p1, p2 := peer.ID("a"), peer.ID("b")
h1 := u.Hash([]byte("1"))
h2 := u.Hash([]byte("2"))
pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))
pm, err := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))
if err != nil {
t.Fatal(err)
}

// add provider
pm.AddProvider(ctx, h1, p1)
Expand All @@ -302,7 +317,10 @@ func TestWriteUpdatesCache(t *testing.T) {

p1, p2 := peer.ID("a"), peer.ID("b")
h1 := u.Hash([]byte("1"))
pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))
pm, err := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))
if err != nil {
t.Fatal(err)
}

// add provider
pm.AddProvider(ctx, h1, p1)
Expand Down