Skip to content

Commit

Permalink
providers: optimize GC
Browse files Browse the repository at this point in the history
1. Don't be n^2.
2. Don't bother walking the cache, just drop it.
  • Loading branch information
Stebalien committed Apr 13, 2019
1 parent a3b9767 commit fbb29ea
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 97 deletions.
125 changes: 37 additions & 88 deletions providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)

pm.proc = goprocessctx.WithContext(ctx)
pm.cleanupInterval = defaultCleanupInterval
pm.proc.Go(func(p goprocess.Process) { pm.run() })
pm.proc.Go(pm.run)

return pm
}
Expand Down Expand Up @@ -191,69 +191,49 @@ func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time)
return dstore.Put(ds.NewKey(dsk), buf[:n])
}

func (pm *ProviderManager) deleteProvSet(k cid.Cid) error {
pm.providers.Remove(k)

func (pm *ProviderManager) gc() {
res, err := pm.dstore.Query(dsq.Query{
KeysOnly: true,
Prefix: mkProvKey(k),
Prefix: providersKeyPrefix,
})
if err != nil {
return err
}

entries, err := res.Rest()
if err != nil {
return err
log.Error("error garbage collecting provider records: ", err)
return
}
defer res.Close()

for _, e := range entries {
err := pm.dstore.Delete(ds.NewKey(e.Key))
if err != nil {
log.Error("deleting provider set: ", err)
now := time.Now()
for {
e, ok := res.NextSync()
if !ok {
return
}
}
return nil
}

func (pm *ProviderManager) getProvKeys() (func() (cid.Cid, bool), error) {
res, err := pm.dstore.Query(dsq.Query{
KeysOnly: true,
Prefix: providersKeyPrefix,
})
if err != nil {
return nil, err
}

iter := func() (cid.Cid, bool) {
for e := range res.Next() {
parts := strings.Split(e.Key, "/")
if len(parts) != 4 {
log.Warningf("incorrectly formatted provider entry in datastore: %s", e.Key)
continue
}
decoded, err := base32.RawStdEncoding.DecodeString(parts[2])
if err != nil {
log.Warning("error decoding base32 provider key: %s: %s", parts[2], err)
continue
}
if e.Error != nil {
log.Error("got an error: ", e.Error)
continue
}

c, err := cid.Cast(decoded)
// check expiration time
t, err := readTimeValue(e.Value)
switch {
case err != nil:
// couldn't parse the time
log.Warning("parsing providers record from disk: ", err)
fallthrough
case now.Sub(t) > ProvideValidity:
// or just expired
err = pm.dstore.Delete(ds.RawKey(e.Key))
if err != nil {
log.Warning("error casting key to cid from datastore key: %s", err)
continue
log.Warning("failed to remove provider record from disk: ", err)
}

return c, true
}
return cid.Cid{}, false
}

return iter, nil
}

func (pm *ProviderManager) run() {
func (pm *ProviderManager) run(proc goprocess.Process) {
tick := time.NewTicker(pm.cleanupInterval)
defer tick.Stop()
for {
select {
case np := <-pm.newprovs:
Expand All @@ -267,47 +247,16 @@ func (pm *ProviderManager) run() {
log.Error("error reading providers: ", err)
}

gp.resp <- provs
// set the cap so the user can't append to this.
gp.resp <- provs[0:len(provs):len(provs)]
case <-tick.C:
keys, err := pm.getProvKeys()
if err != nil {
log.Error("Error loading provider keys: ", err)
continue
}
now := time.Now()
for {
k, ok := keys()
if !ok {
break
}

provs, err := pm.getProvSet(k)
if err != nil {
log.Error("error loading known provset: ", err)
continue
}
for p, t := range provs.set {
if now.Sub(t) > ProvideValidity {
delete(provs.set, p)
}
}
// have we run out of providers?
if len(provs.set) == 0 {
provs.providers = nil
err := pm.deleteProvSet(k)
if err != nil {
log.Error("error deleting provider set: ", err)
}
} else if len(provs.set) < len(provs.providers) {
// We must have modified the providers set, recompute.
provs.providers = make([]peer.ID, 0, len(provs.set))
for p := range provs.set {
provs.providers = append(provs.providers, p)
}
}
}
case <-pm.proc.Closing():
tick.Stop()
// You know the wonderful thing about caches? You can
// drop them.
//
// Much faster than GCing.
pm.providers.Purge()
pm.gc()
case <-proc.Closing():
return
}
}
Expand Down
22 changes: 13 additions & 9 deletions providers/providers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
dssync "github.com/ipfs/go-datastore/sync"
u "github.com/ipfs/go-ipfs-util"
peer "github.com/libp2p/go-libp2p-peer"
//
Expand All @@ -22,7 +24,7 @@ func TestProviderManager(t *testing.T) {
defer cancel()

mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, ds.NewMapDatastore())
p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
a := cid.NewCidV0(u.Hash([]byte("test")))
p.AddProvider(ctx, a, peer.ID("testingprovider"))
resp := p.GetProviders(ctx, a)
Expand All @@ -41,7 +43,7 @@ func TestProvidersDatastore(t *testing.T) {
defer cancel()

mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, ds.NewMapDatastore())
p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
defer p.proc.Close()

friend := peer.ID("friend")
Expand All @@ -64,7 +66,7 @@ func TestProvidersDatastore(t *testing.T) {
}

func TestProvidersSerialization(t *testing.T) {
dstore := ds.NewMapDatastore()
dstore := dssync.MutexWrap(ds.NewMapDatastore())

k := cid.NewCidV0(u.Hash(([]byte("my key!"))))
p1 := peer.ID("peer one")
Expand Down Expand Up @@ -120,7 +122,7 @@ func TestProvidesExpire(t *testing.T) {
defer cancel()

mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, ds.NewMapDatastore())
p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))

peers := []peer.ID{"a", "b"}
var cids []cid.Cid
Expand Down Expand Up @@ -150,13 +152,15 @@ func TestProvidesExpire(t *testing.T) {
t.Fatal("providers map not cleaned up")
}

proviter, err := p.getProvKeys()
res, err := p.dstore.Query(dsq.Query{Prefix: providersKeyPrefix})
if err != nil {
t.Fatal(err)
}

_, ok := proviter()
if ok {
rest, err := res.Rest()
if err != nil {
t.Fatal(err)
}
if len(rest) > 0 {
t.Fatal("expected everything to be cleaned out of the datastore")
}
}
Expand Down Expand Up @@ -229,7 +233,7 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) {
p1, p2 := peer.ID("a"), peer.ID("b")
c1 := cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("1")))
c2 := cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("2")))
pm := NewProviderManager(ctx, p1, ds.NewMapDatastore())
pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))

pm.AddProvider(ctx, c1, p1)
// make the cached provider for c1 go to datastore
Expand Down

0 comments on commit fbb29ea

Please sign in to comment.