Skip to content

Commit

Permalink
providers: run datastore GC concurrently
Browse files Browse the repository at this point in the history
Motivation: Walking the datastore can take time and currently blocks
adding/removing providers.

We need to do this in the same goroutine to avoid some logical races.
  • Loading branch information
Stebalien committed Apr 19, 2019
1 parent 5b6bc7e commit 79aec7e
Showing 1 changed file with 79 additions and 44 deletions.
123 changes: 79 additions & 44 deletions providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,65 +182,51 @@ func (pm *ProviderManager) addProv(k cid.Cid, p peer.ID) error {
return writeProviderEntry(pm.dstore, k, p, now)
}

func mkProvKeyFor(k cid.Cid, p peer.ID) string {
return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p))
}

func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time) error {
dsk := mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p))
dsk := mkProvKeyFor(k, p)

buf := make([]byte, 16)
n := binary.PutVarint(buf, t.UnixNano())

return dstore.Put(ds.NewKey(dsk), buf[:n])
}

func (pm *ProviderManager) gc() {
res, err := pm.dstore.Query(dsq.Query{
Prefix: providersKeyPrefix,
})
if err != nil {
log.Error("error garbage collecting provider records: ", err)
return
}
defer res.Close()

now := time.Now()
for {
e, ok := res.NextSync()
if !ok {
return
}

if e.Error != nil {
log.Error("got an error: ", e.Error)
continue
func (pm *ProviderManager) run(proc goprocess.Process) {
var (
gcQuery dsq.Results
gcQueryRes <-chan dsq.Result
gcSkip map[string]struct{}
gcTime time.Time
gcTimer = time.NewTimer(pm.cleanupInterval)
)

defer func() {
gcTimer.Stop()
if gcQuery != nil {
// don't really care if this fails.
_ = gcQuery.Close()
}

// check expiration time
t, err := readTimeValue(e.Value)
switch {
case err != nil:
// couldn't parse the time
log.Warning("parsing providers record from disk: ", err)
fallthrough
case now.Sub(t) > ProvideValidity:
// or just expired
err = pm.dstore.Delete(ds.RawKey(e.Key))
if err != nil {
log.Warning("failed to remove provider record from disk: ", err)
}
if err := pm.dstore.Flush(); err != nil {
log.Error("failed to flush datastore: ", err)
}
}
}

func (pm *ProviderManager) run(proc goprocess.Process) {
tick := time.NewTicker(pm.cleanupInterval)
defer tick.Stop()
defer pm.dstore.Flush()
}()

for {
select {
case np := <-pm.newprovs:
err := pm.addProv(np.k, np.val)
if err != nil {
log.Error("error adding new providers: ", err)
continue
}
if gcSkip != nil {
// we have an gc, tell it to skip this provider
// as we've updated it since the GC started.
gcSkip[mkProvKeyFor(np.k, np.val)] = struct{}{}
}
case gp := <-pm.getprovs:
provs, err := pm.providersForKey(gp.k)
Expand All @@ -250,13 +236,62 @@ func (pm *ProviderManager) run(proc goprocess.Process) {

// set the cap so the user can't append to this.
gp.resp <- provs[0:len(provs):len(provs)]
case <-tick.C:
case res, ok := <-gcQueryRes:
if !ok {
// cleanup GC round
gcQueryRes = nil
gcSkip = nil
err := gcQuery.Close()
gcQuery = nil
gcTimer.Reset(pm.cleanupInterval)
if err != nil {
log.Error("failed to close provider GC query: ", err)
}
continue
}
if res.Error != nil {
log.Error("got error from GC query: ", res.Error)
continue
}
if _, ok := gcSkip[res.Key]; ok {
// We've updated this record since starting the
// GC round, skip it.
continue
}

// check expiration time
t, err := readTimeValue(res.Value)
switch {
case err != nil:
// couldn't parse the time
log.Warning("parsing providers record from disk: ", err)
fallthrough
case gcTime.Sub(t) > ProvideValidity:
// or expired
err = pm.dstore.Delete(ds.RawKey(res.Key))
if err != nil && err != ds.ErrNotFound {
log.Warning("failed to remove provider record from disk: ", err)
}
}

case gcTime = <-gcTimer.C:
// You know the wonderful thing about caches? You can
// drop them.
//
// Much faster than GCing.
pm.providers.Purge()
pm.gc()

// Now, kick off a GC of the datastore.
q, err := pm.dstore.Query(dsq.Query{
Prefix: providersKeyPrefix,
})
if err != nil {
log.Error("provider record GC query failed: ", err)
continue
}
gcQuery = q
gcQueryRes = q.Next()
gcSkip = make(map[string]struct{})
case <-proc.Closing():
return
}
Expand Down

0 comments on commit 79aec7e

Please sign in to comment.