Skip to content

Commit

Permalink
feat: plumb through datastore contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert committed Oct 28, 2021
1 parent 7724838 commit 60d8bde
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 56 deletions.
8 changes: 4 additions & 4 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,10 +549,10 @@ func (dht *IpfsDHT) persistRTPeersInPeerStore() {
//
// returns nil, nil when either nothing is found or the value found doesn't properly validate.
// returns nil, some_error when there's a *datastore* error (i.e., something goes very wrong)
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
func (dht *IpfsDHT) getLocal(ctx context.Context, key string) (*recpb.Record, error) {
logger.Debugw("finding value in datastore", "key", internal.LoggableRecordKeyString(key))

rec, err := dht.getRecordFromDatastore(mkDsKey(key))
rec, err := dht.getRecordFromDatastore(ctx, mkDsKey(key))
if err != nil {
logger.Warnw("get local failed", "key", internal.LoggableRecordKeyString(key), "error", err)
return nil, err
Expand All @@ -568,14 +568,14 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
}

// putLocal stores the key value pair in the datastore
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
func (dht *IpfsDHT) putLocal(ctx context.Context, key string, rec *recpb.Record) error {
data, err := proto.Marshal(rec)
if err != nil {
logger.Warnw("failed to put marshal record for local put", "error", err, "key", internal.LoggableRecordKeyString(key))
return err
}

return dht.datastore.Put(mkDsKey(key), data)
return dht.datastore.Put(ctx, mkDsKey(key), data)
}

func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
Expand Down
20 changes: 10 additions & 10 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func (dht *FullRT) PutValue(ctx context.Context, key string, value []byte, opts
return err
}

old, err := dht.getLocal(key)
old, err := dht.getLocal(ctx, key)
if err != nil {
// Means something is wrong with the datastore.
return err
Expand All @@ -457,7 +457,7 @@ func (dht *FullRT) PutValue(ctx context.Context, key string, value []byte, opts

rec := record.MakePutRecord(key, value)
rec.TimeReceived = u.FormatRFC3339(time.Now())
err = dht.putLocal(key, rec)
err = dht.putLocal(ctx, key, rec)
if err != nil {
return err
}
Expand Down Expand Up @@ -645,7 +645,7 @@ func (dht *FullRT) updatePeerValues(ctx context.Context, key string, val []byte,
go func(p peer.ID) {
//TODO: Is this possible?
if p == dht.h.ID() {
err := dht.putLocal(key, fixupRec)
err := dht.putLocal(ctx, key, fixupRec)
if err != nil {
logger.Error("Error correcting local dht entry:", err)
}
Expand All @@ -671,7 +671,7 @@ func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan str

logger.Debugw("finding value", "key", internal.LoggableRecordKeyString(key))

if rec, err := dht.getLocal(key); rec != nil && err == nil {
if rec, err := dht.getLocal(ctx, key); rec != nil && err == nil {
select {
case valCh <- RecvdVal{
Val: rec.GetValue(),
Expand Down Expand Up @@ -1398,10 +1398,10 @@ var _ routing.Routing = (*FullRT)(nil)
//
// returns nil, nil when either nothing is found or the value found doesn't properly validate.
// returns nil, some_error when there's a *datastore* error (i.e., something goes very wrong)
func (dht *FullRT) getLocal(key string) (*recpb.Record, error) {
func (dht *FullRT) getLocal(ctx context.Context, key string) (*recpb.Record, error) {
logger.Debugw("finding value in datastore", "key", internal.LoggableRecordKeyString(key))

rec, err := dht.getRecordFromDatastore(mkDsKey(key))
rec, err := dht.getRecordFromDatastore(ctx, mkDsKey(key))
if err != nil {
logger.Warnw("get local failed", "key", internal.LoggableRecordKeyString(key), "error", err)
return nil, err
Expand All @@ -1417,14 +1417,14 @@ func (dht *FullRT) getLocal(key string) (*recpb.Record, error) {
}

// putLocal stores the key value pair in the datastore
func (dht *FullRT) putLocal(key string, rec *recpb.Record) error {
func (dht *FullRT) putLocal(ctx context.Context, key string, rec *recpb.Record) error {
data, err := proto.Marshal(rec)
if err != nil {
logger.Warnw("failed to put marshal record for local put", "error", err, "key", internal.LoggableRecordKeyString(key))
return err
}

return dht.datastore.Put(mkDsKey(key), data)
return dht.datastore.Put(ctx, mkDsKey(key), data)
}

func mkDsKey(s string) ds.Key {
Expand All @@ -1433,8 +1433,8 @@ func mkDsKey(s string) ds.Key {

// returns nil, nil when either nothing is found or the value found doesn't properly validate.
// returns nil, some_error when there's a *datastore* error (i.e., something goes very wrong)
func (dht *FullRT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error) {
buf, err := dht.datastore.Get(dskey)
func (dht *FullRT) getRecordFromDatastore(ctx context.Context, dskey ds.Key) (*recpb.Record, error) {
buf, err := dht.datastore.Get(ctx, dskey)
if err == ds.ErrNotFound {
return nil, nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-datastore v0.5.0
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-ipns v0.1.2
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,9 @@ github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRV
github.com/ipfs/go-datastore v0.4.0/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
github.com/ipfs/go-datastore v0.4.1/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
github.com/ipfs/go-datastore v0.4.4/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
github.com/ipfs/go-datastore v0.4.5 h1:cwOUcGMLdLPWgu3SlrCckCMznaGADbPqE0r8h768/Dg=
github.com/ipfs/go-datastore v0.4.5/go.mod h1:eXTcaaiN6uOlVCLS9GjJUJtlvJfM3xk23w3fyfrmmJs=
github.com/ipfs/go-datastore v0.5.0 h1:rQicVCEacWyk4JZ6G5bD9TKR7lZEG1MWcG7UdWYrFAU=
github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk=
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8=
Expand Down
16 changes: 8 additions & 8 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
// setup response
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())

rec, err := dht.checkLocalDatastore(k)
rec, err := dht.checkLocalDatastore(ctx, k)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -89,10 +89,10 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
return resp, nil
}

func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) {
func (dht *IpfsDHT) checkLocalDatastore(ctx context.Context, k []byte) (*recpb.Record, error) {
logger.Debugf("%s handleGetValue looking into ds", dht.self)
dskey := convertToDsKey(k)
buf, err := dht.datastore.Get(dskey)
buf, err := dht.datastore.Get(ctx, dskey)
logger.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, buf)

if err == ds.ErrNotFound {
Expand Down Expand Up @@ -131,7 +131,7 @@ func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) {
// may be computationally expensive

if recordIsBad {
err := dht.datastore.Delete(dskey)
err := dht.datastore.Delete(ctx, dskey)
if err != nil {
logger.Error("Failed to delete bad record from datastore: ", err)
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
// Make sure the new record is "better" than the record we have locally.
// This prevents a record with for example a lower sequence number from
// overwriting a record with a higher sequence number.
existing, err := dht.getRecordFromDatastore(dskey)
existing, err := dht.getRecordFromDatastore(ctx, dskey)
if err != nil {
return nil, err
}
Expand All @@ -213,14 +213,14 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
return nil, err
}

err = dht.datastore.Put(dskey, data)
err = dht.datastore.Put(ctx, dskey, data)
return pmes, err
}

// returns nil, nil when either nothing is found or the value found doesn't properly validate.
// returns nil, some_error when there's a *datastore* error (i.e., something goes very wrong)
func (dht *IpfsDHT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error) {
buf, err := dht.datastore.Get(dskey)
func (dht *IpfsDHT) getRecordFromDatastore(ctx context.Context, dskey ds.Key) (*recpb.Record, error) {
buf, err := dht.datastore.Get(ctx, dskey)
if err == ds.ErrNotFound {
return nil, nil
}
Expand Down
42 changes: 23 additions & 19 deletions providers/providers_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@ func Cache(c lru.LRUCache) Option {
}

type addProv struct {
ctx context.Context
key []byte
val peer.ID
}

type getProv struct {
ctx context.Context
key []byte
resp chan []peer.ID
}
Expand All @@ -115,7 +117,7 @@ func NewProviderManager(ctx context.Context, local peer.ID, ps peerstore.Peersto
return nil, err
}
pm.proc = goprocessctx.WithContext(ctx)
pm.proc.Go(pm.run)
pm.proc.Go(func(proc goprocess.Process) { pm.run(ctx, proc) })
return pm, nil
}

Expand All @@ -124,7 +126,7 @@ func (pm *ProviderManager) Process() goprocess.Process {
return pm.proc
}

func (pm *ProviderManager) run(proc goprocess.Process) {
func (pm *ProviderManager) run(ctx context.Context, proc goprocess.Process) {
var (
gcQuery dsq.Results
gcQueryRes <-chan dsq.Result
Expand All @@ -139,15 +141,15 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
// don't really care if this fails.
_ = gcQuery.Close()
}
if err := pm.dstore.Flush(); err != nil {
if err := pm.dstore.Flush(ctx); err != nil {
log.Error("failed to flush datastore: ", err)
}
}()

for {
select {
case np := <-pm.newprovs:
err := pm.addProv(np.key, np.val)
err := pm.addProv(np.ctx, np.key, np.val)
if err != nil {
log.Error("error adding new providers: ", err)
continue
Expand All @@ -158,7 +160,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
gcSkip[mkProvKeyFor(np.key, np.val)] = struct{}{}
}
case gp := <-pm.getprovs:
provs, err := pm.getProvidersForKey(gp.key)
provs, err := pm.getProvidersForKey(gp.ctx, gp.key)
if err != nil && err != ds.ErrNotFound {
log.Error("error reading providers: ", err)
}
Expand Down Expand Up @@ -197,7 +199,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
fallthrough
case gcTime.Sub(t) > ProvideValidity:
// or expired
err = pm.dstore.Delete(ds.RawKey(res.Key))
err = pm.dstore.Delete(ctx, ds.RawKey(res.Key))
if err != nil && err != ds.ErrNotFound {
log.Error("failed to remove provider record from disk: ", err)
}
Expand All @@ -211,7 +213,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
pm.cache.Purge()

// Now, kick off a GC of the datastore.
q, err := pm.dstore.Query(dsq.Query{
q, err := pm.dstore.Query(ctx, dsq.Query{
Prefix: ProvidersKeyPrefix,
})
if err != nil {
Expand All @@ -233,6 +235,7 @@ func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, provInfo p
pm.pstore.AddAddrs(provInfo.ID, provInfo.Addrs, peerstore.ProviderAddrTTL)
}
prov := &addProv{
ctx: ctx,
key: k,
val: provInfo.ID,
}
Expand All @@ -245,23 +248,23 @@ func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, provInfo p
}

// addProv updates the cache if needed
func (pm *ProviderManager) addProv(k []byte, p peer.ID) error {
func (pm *ProviderManager) addProv(ctx context.Context, k []byte, p peer.ID) error {
now := time.Now()
if provs, ok := pm.cache.Get(string(k)); ok {
provs.(*providerSet).setVal(p, now)
} // else not cached, just write through

return writeProviderEntry(pm.dstore, k, p, now)
return writeProviderEntry(ctx, pm.dstore, k, p, now)
}

// writeProviderEntry writes the provider into the datastore
func writeProviderEntry(dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error {
func writeProviderEntry(ctx context.Context, dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error {
dsk := mkProvKeyFor(k, p)

buf := make([]byte, 16)
n := binary.PutVarint(buf, t.UnixNano())

return dstore.Put(ds.NewKey(dsk), buf[:n])
return dstore.Put(ctx, ds.NewKey(dsk), buf[:n])
}

func mkProvKeyFor(k []byte, p peer.ID) string {
Expand All @@ -276,6 +279,7 @@ func mkProvKey(k []byte) string {
// This method _does not_ copy the set. Do not modify it.
func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) ([]peer.AddrInfo, error) {
gp := &getProv{
ctx: ctx,
key: k,
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
}
Expand All @@ -292,22 +296,22 @@ func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) ([]peer.A
}
}

func (pm *ProviderManager) getProvidersForKey(k []byte) ([]peer.ID, error) {
pset, err := pm.getProviderSetForKey(k)
func (pm *ProviderManager) getProvidersForKey(ctx context.Context, k []byte) ([]peer.ID, error) {
pset, err := pm.getProviderSetForKey(ctx, k)
if err != nil {
return nil, err
}
return pset.providers, nil
}

// returns the ProviderSet if it already exists on cache, otherwise loads it from datasatore
func (pm *ProviderManager) getProviderSetForKey(k []byte) (*providerSet, error) {
func (pm *ProviderManager) getProviderSetForKey(ctx context.Context, k []byte) (*providerSet, error) {
cached, ok := pm.cache.Get(string(k))
if ok {
return cached.(*providerSet), nil
}

pset, err := loadProviderSet(pm.dstore, k)
pset, err := loadProviderSet(ctx, pm.dstore, k)
if err != nil {
return nil, err
}
Expand All @@ -320,8 +324,8 @@ func (pm *ProviderManager) getProviderSetForKey(k []byte) (*providerSet, error)
}

// loads the ProviderSet out of the datastore
func loadProviderSet(dstore ds.Datastore, k []byte) (*providerSet, error) {
res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)})
func loadProviderSet(ctx context.Context, dstore ds.Datastore, k []byte) (*providerSet, error) {
res, err := dstore.Query(ctx, dsq.Query{Prefix: mkProvKey(k)})
if err != nil {
return nil, err
}
Expand All @@ -348,7 +352,7 @@ func loadProviderSet(dstore ds.Datastore, k []byte) (*providerSet, error) {
fallthrough
case now.Sub(t) > ProvideValidity:
// or just expired
err = dstore.Delete(ds.RawKey(e.Key))
err = dstore.Delete(ctx, ds.RawKey(e.Key))
if err != nil && err != ds.ErrNotFound {
log.Error("failed to remove provider record from disk: ", err)
}
Expand All @@ -360,7 +364,7 @@ func loadProviderSet(dstore ds.Datastore, k []byte) (*providerSet, error) {
decstr, err := base32.RawStdEncoding.DecodeString(e.Key[lix+1:])
if err != nil {
log.Error("base32 decoding error: ", err)
err = dstore.Delete(ds.RawKey(e.Key))
err = dstore.Delete(ctx, ds.RawKey(e.Key))
if err != nil && err != ds.ErrNotFound {
log.Error("failed to remove provider record from disk: ", err)
}
Expand Down
8 changes: 4 additions & 4 deletions providers/providers_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,17 @@ func TestProvidersSerialization(t *testing.T) {
pt1 := time.Now()
pt2 := pt1.Add(time.Hour)

err := writeProviderEntry(dstore, k, p1, pt1)
err := writeProviderEntry(context.Background(), dstore, k, p1, pt1)
if err != nil {
t.Fatal(err)
}

err = writeProviderEntry(dstore, k, p2, pt2)
err = writeProviderEntry(context.Background(), dstore, k, p2, pt2)
if err != nil {
t.Fatal(err)
}

pset, err := loadProviderSet(dstore, k)
pset, err := loadProviderSet(context.Background(), dstore, k)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestProvidesExpire(t *testing.T) {
t.Fatal("providers map not cleaned up")
}

res, err := ds.Query(dsq.Query{Prefix: ProvidersKeyPrefix})
res, err := ds.Query(context.Background(), dsq.Query{Prefix: ProvidersKeyPrefix})
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 60d8bde

Please sign in to comment.