Skip to content

Commit

Permalink
Merge pull request #121 from libp2p/chore/update-ds
Browse files Browse the repository at this point in the history
chore(dep): update go-log
  • Loading branch information
Stebalien authored Feb 21, 2020
2 parents fc51534 + 67313c2 commit 637e7e4
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 28 deletions.
2 changes: 1 addition & 1 deletion p2p/host/peerstore/pstoreds/addr_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (ab *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.D
func (ab *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
pr, err := ab.loadRecord(p, true, true)
if err != nil {
log.Warning("failed to load peerstore entry for peer %v while querying addrs, err: %v", p, err)
log.Warn("failed to load peerstore entry for peer %v while querying addrs, err: %v", p, err)
return nil
}

Expand Down
50 changes: 25 additions & 25 deletions p2p/host/peerstore/pstoreds/addr_book_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,40 +142,40 @@ func (gc *dsAddrBookGc) purgeLookahead() {
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} // empty record to reuse and avoid allocs.
batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch)
if err != nil {
log.Warningf("failed while creating batch to purge GC entries: %v", err)
log.Warnf("failed while creating batch to purge GC entries: %v", err)
}

// This function drops an unparseable GC entry; this is for safety. It is an escape hatch in case
// we modify the format of keys going forward. If a user runs a new version against an old DB,
// if we don't clean up unparseable entries we'll end up accumulating garbage.
dropInError := func(key ds.Key, err error, msg string) {
if err != nil {
log.Warningf("failed while %s record with GC key: %v, err: %v; deleting", msg, key, err)
log.Warnf("failed while %s record with GC key: %v, err: %v; deleting", msg, key, err)
}
if err = batch.Delete(key); err != nil {
log.Warningf("failed to delete corrupt GC lookahead entry: %v, err: %v", key, err)
log.Warnf("failed to delete corrupt GC lookahead entry: %v, err: %v", key, err)
}
}

// This function drops a GC key if the entry is cleaned correctly. It may reschedule another visit
// if the next earliest expiry falls within the current window again.
dropOrReschedule := func(key ds.Key, ar *addrsRecord) {
if err := batch.Delete(key); err != nil {
log.Warningf("failed to delete lookahead entry: %v, err: %v", key, err)
log.Warnf("failed to delete lookahead entry: %v, err: %v", key, err)
}

// re-add the record if it needs to be visited again in this window.
if len(ar.Addrs) != 0 && ar.Addrs[0].Expiry <= gc.currWindowEnd {
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", ar.Addrs[0].Expiry, key.Name()))
if err := batch.Put(gcKey, []byte{}); err != nil {
log.Warningf("failed to add new GC key: %v, err: %v", gcKey, err)
log.Warnf("failed to add new GC key: %v, err: %v", gcKey, err)
}
}
}

results, err := gc.ab.ds.Query(purgeLookaheadQuery)
if err != nil {
log.Warningf("failed while fetching entries to purge: %v", err)
log.Warnf("failed while fetching entries to purge: %v", err)
return
}
defer results.Close()
Expand All @@ -189,7 +189,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
ts, err := strconv.ParseInt(gcKey.Parent().Name(), 10, 64)
if err != nil {
dropInError(gcKey, err, "parsing timestamp")
log.Warningf("failed while parsing timestamp from key: %v, err: %v", result.Key, err)
log.Warnf("failed while parsing timestamp from key: %v, err: %v", result.Key, err)
continue
} else if ts > now {
// this is an ordered cursor; when we hit an entry with a timestamp beyond now, we can break.
Expand All @@ -199,14 +199,14 @@ func (gc *dsAddrBookGc) purgeLookahead() {
idb32, err := b32.RawStdEncoding.DecodeString(gcKey.Name())
if err != nil {
dropInError(gcKey, err, "parsing peer ID")
log.Warningf("failed while parsing b32 peer ID from key: %v, err: %v", result.Key, err)
log.Warnf("failed while parsing b32 peer ID from key: %v, err: %v", result.Key, err)
continue
}

id, err = peer.IDFromBytes(idb32)
if err != nil {
dropInError(gcKey, err, "decoding peer ID")
log.Warningf("failed while decoding peer ID from key: %v, err: %v", result.Key, err)
log.Warnf("failed while decoding peer ID from key: %v, err: %v", result.Key, err)
continue
}

Expand All @@ -216,7 +216,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
cached.Lock()
if cached.clean() {
if err = cached.flush(batch); err != nil {
log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
log.Warnf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
}
}
dropOrReschedule(gcKey, cached)
Expand All @@ -242,14 +242,14 @@ func (gc *dsAddrBookGc) purgeLookahead() {
if record.clean() {
err = record.flush(batch)
if err != nil {
log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
log.Warnf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
}
}
dropOrReschedule(gcKey, record)
}

if err = batch.Commit(); err != nil {
log.Warningf("failed to commit GC purge batch: %v", err)
log.Warnf("failed to commit GC purge batch: %v", err)
}
}

Expand All @@ -265,12 +265,12 @@ func (gc *dsAddrBookGc) purgeStore() {
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} // empty record to reuse and avoid allocs.
batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch)
if err != nil {
log.Warningf("failed while creating batch to purge GC entries: %v", err)
log.Warnf("failed while creating batch to purge GC entries: %v", err)
}

results, err := gc.ab.ds.Query(purgeStoreQuery)
if err != nil {
log.Warningf("failed while opening iterator: %v", err)
log.Warnf("failed while opening iterator: %v", err)
return
}
defer results.Close()
Expand All @@ -289,13 +289,13 @@ func (gc *dsAddrBookGc) purgeStore() {
}

if err := record.flush(batch); err != nil {
log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id, err)
log.Warnf("failed to flush entry modified by GC for peer: &v, err: %v", id, err)
}
gc.ab.cache.Remove(id)
}

if err = batch.Commit(); err != nil {
log.Warningf("failed to commit GC purge batch: %v", err)
log.Warnf("failed to commit GC purge batch: %v", err)
}
}

Expand Down Expand Up @@ -323,26 +323,26 @@ func (gc *dsAddrBookGc) populateLookahead() {
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
results, err := gc.ab.ds.Query(populateLookaheadQuery)
if err != nil {
log.Warningf("failed while querying to populate lookahead GC window: %v", err)
log.Warnf("failed while querying to populate lookahead GC window: %v", err)
return
}
defer results.Close()

batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch)
if err != nil {
log.Warningf("failed while creating batch to populate lookahead GC window: %v", err)
log.Warnf("failed while creating batch to populate lookahead GC window: %v", err)
return
}

for result := range results.Next() {
idb32 := ds.RawKey(result.Key).Name()
k, err := b32.RawStdEncoding.DecodeString(idb32)
if err != nil {
log.Warningf("failed while decoding peer ID from key: %v, err: %v", result.Key, err)
log.Warnf("failed while decoding peer ID from key: %v, err: %v", result.Key, err)
continue
}
if id, err = peer.IDFromBytes(k); err != nil {
log.Warningf("failed while decoding peer ID from key: %v, err: %v", result.Key, err)
log.Warnf("failed while decoding peer ID from key: %v, err: %v", result.Key, err)
}

// if the record is in cache, use the cached version.
Expand All @@ -355,7 +355,7 @@ func (gc *dsAddrBookGc) populateLookahead() {
}
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", cached.Addrs[0].Expiry, idb32))
if err = batch.Put(gcKey, []byte{}); err != nil {
log.Warningf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err)
log.Warnf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err)
}
cached.RUnlock()
continue
Expand All @@ -365,23 +365,23 @@ func (gc *dsAddrBookGc) populateLookahead() {

val, err := gc.ab.ds.Get(ds.RawKey(result.Key))
if err != nil {
log.Warningf("failed which getting record from store for peer: %v, err: %v", id.Pretty(), err)
log.Warnf("failed which getting record from store for peer: %v, err: %v", id.Pretty(), err)
continue
}
if err := record.Unmarshal(val); err != nil {
log.Warningf("failed while unmarshalling record from store for peer: %v, err: %v", id.Pretty(), err)
log.Warnf("failed while unmarshalling record from store for peer: %v, err: %v", id.Pretty(), err)
continue
}
if len(record.Addrs) > 0 && record.Addrs[0].Expiry <= until {
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", record.Addrs[0].Expiry, idb32))
if err = batch.Put(gcKey, []byte{}); err != nil {
log.Warningf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err)
log.Warnf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err)
}
}
}

if err = batch.Commit(); err != nil {
log.Warningf("failed to commit GC lookahead batch: %v", err)
log.Warnf("failed to commit GC lookahead batch: %v", err)
}

gc.currWindowEnd = until
Expand Down
4 changes: 2 additions & 2 deletions p2p/host/peerstore/pstoremem/addr_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
exp := time.Now().Add(ttl)
for _, addr := range addrs {
if addr == nil {
log.Warningf("was passed nil multiaddr for %s", p)
log.Warnf("was passed nil multiaddr for %s", p)
continue
}
asBytes := addr.Bytes()
Expand Down Expand Up @@ -194,7 +194,7 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
exp := time.Now().Add(ttl)
for _, addr := range addrs {
if addr == nil {
log.Warningf("was passed nil multiaddr for %s", p)
log.Warnf("was passed nil multiaddr for %s", p)
continue
}

Expand Down

0 comments on commit 637e7e4

Please sign in to comment.