From 498dd816993abbdda2b65bd419a0e63f52d61744 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 18 Apr 2019 16:10:29 -0700 Subject: [PATCH 1/3] fix(providers): gc Otherwise, we'll delete everything. --- providers/providers.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/providers/providers.go b/providers/providers.go index 885598024..6440fce59 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -193,8 +193,7 @@ func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time) func (pm *ProviderManager) gc() { res, err := pm.dstore.Query(dsq.Query{ - KeysOnly: true, - Prefix: providersKeyPrefix, + Prefix: providersKeyPrefix, }) if err != nil { log.Error("error garbage collecting provider records: ", err) From e4d739c96dd0f882e46b03cd70893b6f74b6bf7d Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 18 Apr 2019 16:22:35 -0700 Subject: [PATCH 2/3] providers: flush batch on process close --- providers/providers.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/providers/providers.go b/providers/providers.go index 6440fce59..c6ab64ce0 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -31,7 +31,7 @@ type ProviderManager struct { // all non channel fields are meant to be accessed only within // the run method providers *lru.LRU - dstore ds.Datastore + dstore *autobatch.Datastore newprovs chan *addProv getprovs chan *getProv @@ -233,6 +233,8 @@ func (pm *ProviderManager) gc() { func (pm *ProviderManager) run(proc goprocess.Process) { tick := time.NewTicker(pm.cleanupInterval) defer tick.Stop() + defer pm.dstore.Flush() + for { select { case np := <-pm.newprovs: From 5bb50b8d682cd1a60461149b0f2d22dac11b1017 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 18 Apr 2019 16:31:05 -0700 Subject: [PATCH 3/3] providers: test partial GC --- providers/providers_test.go | 38 ++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/providers/providers_test.go b/providers/providers_test.go index e3786672d..aacb2d960 100644 --- a/providers/providers_test.go +++ b/providers/providers_test.go @@ -129,38 +129,62 @@ func TestProvidesExpire(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ds := dssync.MutexWrap(ds.NewMapDatastore()) mid := peer.ID("testing") - p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore())) + p := NewProviderManager(ctx, mid, ds) peers := []peer.ID{"a", "b"} var cids []cid.Cid for i := 0; i < 10; i++ { c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i)))) cids = append(cids, c) + } + + for _, c := range cids[:5] { p.AddProvider(ctx, c, peers[0]) p.AddProvider(ctx, c, peers[1]) } - for i := 0; i < 10; i++ { - out := p.GetProviders(ctx, cids[i]) + time.Sleep(time.Second / 4) + + for _, c := range cids[5:] { + p.AddProvider(ctx, c, peers[0]) + p.AddProvider(ctx, c, peers[1]) + } + + for _, c := range cids { + out := p.GetProviders(ctx, c) if len(out) != 2 { t.Fatal("expected providers to still be there") } } - time.Sleep(time.Second) - for i := 0; i < 10; i++ { - out := p.GetProviders(ctx, cids[i]) + time.Sleep(3 * time.Second / 8) + + for _, c := range cids[:5] { + out := p.GetProviders(ctx, c) if len(out) > 0 { t.Fatal("expected providers to be cleaned up, got: ", out) } } + for _, c := range cids[5:] { + out := p.GetProviders(ctx, c) + if len(out) != 2 { + t.Fatal("expected providers to still be there") + } + } + + time.Sleep(time.Second / 2) + + // Stop to prevent data races + p.Process().Close() + if p.providers.Len() != 0 { t.Fatal("providers map not cleaned up") } - res, err := p.dstore.Query(dsq.Query{Prefix: providersKeyPrefix}) + res, err := ds.Query(dsq.Query{Prefix: providersKeyPrefix}) if err != nil { t.Fatal(err) }