diff --git a/providers/providers.go b/providers/providers.go index 7f88b1309..885598024 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -68,7 +68,7 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) pm.proc = goprocessctx.WithContext(ctx) pm.cleanupInterval = defaultCleanupInterval - pm.proc.Go(func(p goprocess.Process) { pm.run() }) + pm.proc.Go(pm.run) return pm } @@ -191,69 +191,49 @@ func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time) return dstore.Put(ds.NewKey(dsk), buf[:n]) } -func (pm *ProviderManager) deleteProvSet(k cid.Cid) error { - pm.providers.Remove(k) - +func (pm *ProviderManager) gc() { res, err := pm.dstore.Query(dsq.Query{ KeysOnly: true, - Prefix: mkProvKey(k), + Prefix: providersKeyPrefix, }) if err != nil { - return err - } - - entries, err := res.Rest() - if err != nil { - return err + log.Error("error garbage collecting provider records: ", err) + return } + defer res.Close() - for _, e := range entries { - err := pm.dstore.Delete(ds.NewKey(e.Key)) - if err != nil { - log.Error("deleting provider set: ", err) + now := time.Now() + for { + e, ok := res.NextSync() + if !ok { + return } - } - return nil -} - -func (pm *ProviderManager) getProvKeys() (func() (cid.Cid, bool), error) { - res, err := pm.dstore.Query(dsq.Query{ - KeysOnly: true, - Prefix: providersKeyPrefix, - }) - if err != nil { - return nil, err - } - iter := func() (cid.Cid, bool) { - for e := range res.Next() { - parts := strings.Split(e.Key, "/") - if len(parts) != 4 { - log.Warningf("incorrectly formatted provider entry in datastore: %s", e.Key) - continue - } - decoded, err := base32.RawStdEncoding.DecodeString(parts[2]) - if err != nil { - log.Warning("error decoding base32 provider key: %s: %s", parts[2], err) - continue - } + if e.Error != nil { + log.Error("got an error: ", e.Error) + continue + } - c, err := cid.Cast(decoded) + // check expiration time + t, err := readTimeValue(e.Value) + switch { + case err != nil: + // couldn't parse the time + log.Warning("parsing providers record from disk: ", err) + fallthrough + case now.Sub(t) > ProvideValidity: + // or just expired + err = pm.dstore.Delete(ds.RawKey(e.Key)) if err != nil { - log.Warning("error casting key to cid from datastore key: %s", err) - continue + log.Warning("failed to remove provider record from disk: ", err) } - - return c, true } - return cid.Cid{}, false } - - return iter, nil } -func (pm *ProviderManager) run() { +func (pm *ProviderManager) run(proc goprocess.Process) { tick := time.NewTicker(pm.cleanupInterval) + defer tick.Stop() for { select { case np := <-pm.newprovs: @@ -267,47 +247,16 @@ func (pm *ProviderManager) run() { log.Error("error reading providers: ", err) } - gp.resp <- provs + // set the cap so the user can't append to this. + gp.resp <- provs[0:len(provs):len(provs)] case <-tick.C: - keys, err := pm.getProvKeys() - if err != nil { - log.Error("Error loading provider keys: ", err) - continue - } - now := time.Now() - for { - k, ok := keys() - if !ok { - break - } - - provs, err := pm.getProvSet(k) - if err != nil { - log.Error("error loading known provset: ", err) - continue - } - for p, t := range provs.set { - if now.Sub(t) > ProvideValidity { - delete(provs.set, p) - } - } - // have we run out of providers? - if len(provs.set) == 0 { - provs.providers = nil - err := pm.deleteProvSet(k) - if err != nil { - log.Error("error deleting provider set: ", err) - } - } else if len(provs.set) < len(provs.providers) { - // We must have modified the providers set, recompute. - provs.providers = make([]peer.ID, 0, len(provs.set)) - for p := range provs.set { - provs.providers = append(provs.providers, p) - } - } - } - case <-pm.proc.Closing(): - tick.Stop() + // You know the wonderful thing about caches? You can + // drop them. + // + // Much faster than GCing. + pm.providers.Purge() + pm.gc() + case <-proc.Closing(): return } } diff --git a/providers/providers_test.go b/providers/providers_test.go index c29b1cf8b..dafb1c247 100644 --- a/providers/providers_test.go +++ b/providers/providers_test.go @@ -10,6 +10,8 @@ import ( cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" + dssync "github.com/ipfs/go-datastore/sync" u "github.com/ipfs/go-ipfs-util" peer "github.com/libp2p/go-libp2p-peer" // @@ -22,7 +24,7 @@ func TestProviderManager(t *testing.T) { defer cancel() mid := peer.ID("testing") - p := NewProviderManager(ctx, mid, ds.NewMapDatastore()) + p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore())) a := cid.NewCidV0(u.Hash([]byte("test"))) p.AddProvider(ctx, a, peer.ID("testingprovider")) resp := p.GetProviders(ctx, a) @@ -41,7 +43,7 @@ func TestProvidersDatastore(t *testing.T) { defer cancel() mid := peer.ID("testing") - p := NewProviderManager(ctx, mid, ds.NewMapDatastore()) + p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore())) defer p.proc.Close() friend := peer.ID("friend") @@ -64,7 +66,7 @@ func TestProvidersDatastore(t *testing.T) { } func TestProvidersSerialization(t *testing.T) { - dstore := ds.NewMapDatastore() + dstore := dssync.MutexWrap(ds.NewMapDatastore()) k := cid.NewCidV0(u.Hash(([]byte("my key!")))) p1 := peer.ID("peer one") @@ -120,7 +122,7 @@ func TestProvidesExpire(t *testing.T) { defer cancel() mid := peer.ID("testing") - p := NewProviderManager(ctx, mid, ds.NewMapDatastore()) + p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore())) peers := []peer.ID{"a", "b"} var cids []cid.Cid @@ -150,13 +152,15 @@ func TestProvidesExpire(t *testing.T) { t.Fatal("providers map not cleaned up") } - proviter, err := p.getProvKeys() + res, err := p.dstore.Query(dsq.Query{Prefix: providersKeyPrefix}) if err != nil { t.Fatal(err) } - - _, ok := proviter() - if ok { + rest, err := res.Rest() + if err != nil { + t.Fatal(err) + } + if len(rest) > 0 { t.Fatal("expected everything to be cleaned out of the datastore") } } @@ -229,7 +233,7 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) { p1, p2 := peer.ID("a"), peer.ID("b") c1 := cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("1"))) c2 := cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("2"))) - pm := NewProviderManager(ctx, p1, ds.NewMapDatastore()) + pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore())) pm.AddProvider(ctx, c1, p1) // make the cached provider for c1 go to datastore