diff --git a/providers/providers.go b/providers/providers.go index 885598024..c6ab64ce0 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -31,7 +31,7 @@ type ProviderManager struct { // all non channel fields are meant to be accessed only within // the run method providers *lru.LRU - dstore ds.Datastore + dstore *autobatch.Datastore newprovs chan *addProv getprovs chan *getProv @@ -193,8 +193,7 @@ func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time) func (pm *ProviderManager) gc() { res, err := pm.dstore.Query(dsq.Query{ - KeysOnly: true, - Prefix: providersKeyPrefix, + Prefix: providersKeyPrefix, }) if err != nil { log.Error("error garbage collecting provider records: ", err) @@ -234,6 +233,8 @@ func (pm *ProviderManager) gc() { func (pm *ProviderManager) run(proc goprocess.Process) { tick := time.NewTicker(pm.cleanupInterval) defer tick.Stop() + defer pm.dstore.Flush() + for { select { case np := <-pm.newprovs: diff --git a/providers/providers_test.go b/providers/providers_test.go index e3786672d..aacb2d960 100644 --- a/providers/providers_test.go +++ b/providers/providers_test.go @@ -129,38 +129,62 @@ func TestProvidesExpire(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ds := dssync.MutexWrap(ds.NewMapDatastore()) mid := peer.ID("testing") - p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore())) + p := NewProviderManager(ctx, mid, ds) peers := []peer.ID{"a", "b"} var cids []cid.Cid for i := 0; i < 10; i++ { c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i)))) cids = append(cids, c) + } + + for _, c := range cids[:5] { p.AddProvider(ctx, c, peers[0]) p.AddProvider(ctx, c, peers[1]) } - for i := 0; i < 10; i++ { - out := p.GetProviders(ctx, cids[i]) + time.Sleep(time.Second / 4) + + for _, c := range cids[5:] { + p.AddProvider(ctx, c, peers[0]) + p.AddProvider(ctx, c, peers[1]) + } + + for _, c := range cids { + out := p.GetProviders(ctx, c) if len(out) != 2 { t.Fatal("expected providers to still be there") } } - time.Sleep(time.Second) - for i := 0; i < 10; i++ { - out := p.GetProviders(ctx, cids[i]) + time.Sleep(3 * time.Second / 8) + + for _, c := range cids[:5] { + out := p.GetProviders(ctx, c) if len(out) > 0 { t.Fatal("expected providers to be cleaned up, got: ", out) } } + for _, c := range cids[5:] { + out := p.GetProviders(ctx, c) + if len(out) != 2 { + t.Fatal("expected providers to still be there") + } + } + + time.Sleep(time.Second / 2) + + // Stop to prevent data races + p.Process().Close() + if p.providers.Len() != 0 { t.Fatal("providers map not cleaned up") } - res, err := p.dstore.Query(dsq.Query{Prefix: providersKeyPrefix}) + res, err := ds.Query(dsq.Query{Prefix: providersKeyPrefix}) if err != nil { t.Fatal(err) }