diff --git a/providers/providers.go b/providers/providers.go index c6ab64ce0..6d77f989f 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -182,8 +182,12 @@ func (pm *ProviderManager) addProv(k cid.Cid, p peer.ID) error { return writeProviderEntry(pm.dstore, k, p, now) } +func mkProvKeyFor(k cid.Cid, p peer.ID) string { + return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p)) +} + func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time) error { - dsk := mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p)) + dsk := mkProvKeyFor(k, p) buf := make([]byte, 16) n := binary.PutVarint(buf, t.UnixNano()) @@ -191,49 +195,25 @@ func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time) return dstore.Put(ds.NewKey(dsk), buf[:n]) } -func (pm *ProviderManager) gc() { - res, err := pm.dstore.Query(dsq.Query{ - Prefix: providersKeyPrefix, - }) - if err != nil { - log.Error("error garbage collecting provider records: ", err) - return - } - defer res.Close() - - now := time.Now() - for { - e, ok := res.NextSync() - if !ok { - return - } - - if e.Error != nil { - log.Error("got an error: ", e.Error) - continue +func (pm *ProviderManager) run(proc goprocess.Process) { + var ( + gcQuery dsq.Results + gcQueryRes <-chan dsq.Result + gcSkip map[string]struct{} + gcTime time.Time + gcTimer = time.NewTimer(pm.cleanupInterval) + ) + + defer func() { + gcTimer.Stop() + if gcQuery != nil { + // don't really care if this fails. + _ = gcQuery.Close() } - - // check expiration time - t, err := readTimeValue(e.Value) - switch { - case err != nil: - // couldn't parse the time - log.Warning("parsing providers record from disk: ", err) - fallthrough - case now.Sub(t) > ProvideValidity: - // or just expired - err = pm.dstore.Delete(ds.RawKey(e.Key)) - if err != nil { - log.Warning("failed to remove provider record from disk: ", err) - } + if err := pm.dstore.Flush(); err != nil { + log.Error("failed to flush datastore: ", err) } - } -} - -func (pm *ProviderManager) run(proc goprocess.Process) { - tick := time.NewTicker(pm.cleanupInterval) - defer tick.Stop() - defer pm.dstore.Flush() + }() for { select { @@ -241,6 +221,12 @@ func (pm *ProviderManager) run(proc goprocess.Process) { err := pm.addProv(np.k, np.val) if err != nil { log.Error("error adding new providers: ", err) + continue + } + if gcSkip != nil { + // we have an gc, tell it to skip this provider + // as we've updated it since the GC started. + gcSkip[mkProvKeyFor(np.k, np.val)] = struct{}{} } case gp := <-pm.getprovs: provs, err := pm.providersForKey(gp.k) @@ -250,13 +236,62 @@ func (pm *ProviderManager) run(proc goprocess.Process) { // set the cap so the user can't append to this. gp.resp <- provs[0:len(provs):len(provs)] - case <-tick.C: + case res, ok := <-gcQueryRes: + if !ok { + // cleanup GC round + gcQueryRes = nil + gcSkip = nil + err := gcQuery.Close() + gcQuery = nil + gcTimer.Reset(pm.cleanupInterval) + if err != nil { + log.Error("failed to close provider GC query: ", err) + } + continue + } + if res.Error != nil { + log.Error("got error from GC query: ", res.Error) + continue + } + if _, ok := gcSkip[res.Key]; ok { + // We've updated this record since starting the + // GC round, skip it. + continue + } + + // check expiration time + t, err := readTimeValue(res.Value) + switch { + case err != nil: + // couldn't parse the time + log.Warning("parsing providers record from disk: ", err) + fallthrough + case gcTime.Sub(t) > ProvideValidity: + // or expired + err = pm.dstore.Delete(ds.RawKey(res.Key)) + if err != nil && err != ds.ErrNotFound { + log.Warning("failed to remove provider record from disk: ", err) + } + } + + case gcTime = <-gcTimer.C: // You know the wonderful thing about caches? You can // drop them. // // Much faster than GCing. pm.providers.Purge() - pm.gc() + + // Now, kick off a GC of the datastore. + q, err := pm.dstore.Query(dsq.Query{ + Prefix: providersKeyPrefix, + }) + if err != nil { + log.Error("provider record GC query failed: ", err) + continue + } + gcQuery = q + gcQueryRes = q.Next() + gcSkip = make(map[string]struct{}) case <-proc.Closing(): return }