Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

providers: run datastore GC concurrently #326

Merged
merged 1 commit into from
Apr 20, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 79 additions & 44 deletions providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,65 +182,51 @@ func (pm *ProviderManager) addProv(k cid.Cid, p peer.ID) error {
return writeProviderEntry(pm.dstore, k, p, now)
}

func mkProvKeyFor(k cid.Cid, p peer.ID) string {
return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p))
}

func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time) error {
dsk := mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p))
dsk := mkProvKeyFor(k, p)

buf := make([]byte, 16)
n := binary.PutVarint(buf, t.UnixNano())

return dstore.Put(ds.NewKey(dsk), buf[:n])
}

func (pm *ProviderManager) gc() {
res, err := pm.dstore.Query(dsq.Query{
Prefix: providersKeyPrefix,
})
if err != nil {
log.Error("error garbage collecting provider records: ", err)
return
}
defer res.Close()

now := time.Now()
for {
e, ok := res.NextSync()
if !ok {
return
}

if e.Error != nil {
log.Error("got an error: ", e.Error)
continue
func (pm *ProviderManager) run(proc goprocess.Process) {
var (
gcQuery dsq.Results
gcQueryRes <-chan dsq.Result
gcSkip map[string]struct{}
gcTime time.Time
gcTimer = time.NewTimer(pm.cleanupInterval)
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried breaking this out into a separate struct but it really doesn't help.


defer func() {
gcTimer.Stop()
if gcQuery != nil {
// don't really care if this fails.
_ = gcQuery.Close()
}

// check expiration time
t, err := readTimeValue(e.Value)
switch {
case err != nil:
// couldn't parse the time
log.Warning("parsing providers record from disk: ", err)
fallthrough
case now.Sub(t) > ProvideValidity:
// or just expired
err = pm.dstore.Delete(ds.RawKey(e.Key))
if err != nil {
log.Warning("failed to remove provider record from disk: ", err)
}
if err := pm.dstore.Flush(); err != nil {
log.Error("failed to flush datastore: ", err)
}
}
}

func (pm *ProviderManager) run(proc goprocess.Process) {
tick := time.NewTicker(pm.cleanupInterval)
defer tick.Stop()
defer pm.dstore.Flush()
}()

for {
select {
case np := <-pm.newprovs:
err := pm.addProv(np.k, np.val)
if err != nil {
log.Error("error adding new providers: ", err)
continue
}
if gcSkip != nil {
// we have an gc, tell it to skip this provider
// as we've updated it since the GC started.
gcSkip[mkProvKeyFor(np.k, np.val)] = struct{}{}
}
case gp := <-pm.getprovs:
provs, err := pm.providersForKey(gp.k)
Expand All @@ -250,13 +236,62 @@ func (pm *ProviderManager) run(proc goprocess.Process) {

// set the cap so the user can't append to this.
gp.resp <- provs[0:len(provs):len(provs)]
case <-tick.C:
case res, ok := <-gcQueryRes:
if !ok {
if err := gcQuery.Close(); err != nil {
log.Error("failed to close provider GC query: ", err)
}
gcTimer.Reset(pm.cleanupInterval)

// cleanup GC round
gcQueryRes = nil
gcSkip = nil
gcQuery = nil
continue
}
if res.Error != nil {
log.Error("got error from GC query: ", res.Error)
continue
}
if _, ok := gcSkip[res.Key]; ok {
// We've updated this record since starting the
// GC round, skip it.
continue
}

// check expiration time
t, err := readTimeValue(res.Value)
switch {
case err != nil:
// couldn't parse the time
log.Warning("parsing providers record from disk: ", err)
fallthrough
case gcTime.Sub(t) > ProvideValidity:
// or expired
err = pm.dstore.Delete(ds.RawKey(res.Key))
if err != nil && err != ds.ErrNotFound {
log.Warning("failed to remove provider record from disk: ", err)
}
}

case gcTime = <-gcTimer.C:
// You know the wonderful thing about caches? You can
// drop them.
//
// Much faster than GCing.
pm.providers.Purge()
pm.gc()

// Now, kick off a GC of the datastore.
q, err := pm.dstore.Query(dsq.Query{
Prefix: providersKeyPrefix,
})
if err != nil {
log.Error("provider record GC query failed: ", err)
continue
}
gcQuery = q
gcQueryRes = q.Next()
gcSkip = make(map[string]struct{})
case <-proc.Closing():
return
}
Expand Down