Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fullrt rework batching #720

Merged
merged 19 commits into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 178 additions & 91 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -174,7 +175,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful

crawlerInterval: time.Minute * 60,

bulkSendParallelism: 10,
bulkSendParallelism: 20,
}

rt.wg.Add(1)
Expand Down Expand Up @@ -918,21 +919,11 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)
return fmt.Errorf("no known addresses for self, cannot put provider")
}

fn := func(ctx context.Context, k peer.ID) error {
peers, err := dht.GetClosestPeers(ctx, string(k))
if err != nil {
return err
}
successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error {
pmes := dht_pb.NewMessage(dht_pb.Message_ADD_PROVIDER, multihash.Multihash(k), 0)
pmes.ProviderPeers = pbPeers

return dht.messageSender.SendMessage(ctx, p, pmes)
}, peers, true)
if successes == 0 {
return fmt.Errorf("no successful provides")
}
return nil
fn := func(ctx context.Context, p, k peer.ID) error {
pmes := dht_pb.NewMessage(dht_pb.Message_ADD_PROVIDER, multihash.Multihash(k), 0)
pmes.ProviderPeers = pbPeers

return dht.messageSender.SendMessage(ctx, p, pmes)
}

keysAsPeerIDs := make([]peer.ID, 0, len(keys))
Expand Down Expand Up @@ -963,114 +954,210 @@ func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte)
return fmt.Errorf("does not support duplicate keys")
}

fn := func(ctx context.Context, k peer.ID) error {
peers, err := dht.GetClosestPeers(ctx, string(k))
if err != nil {
return err
}
successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error {
keyStr := string(k)
return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr]))
}, peers, true)
if successes == 0 {
return fmt.Errorf("no successful puts")
}
return nil
fn := func(ctx context.Context, p, k peer.ID) error {
keyStr := string(k)
return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr]))
}

return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn, false)
}

func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, k peer.ID) error, isProvRec bool) error {
func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, target, k peer.ID) error, isProvRec bool) error {
if len(keys) == 0 {
return nil
}

sortedKeys := kb.SortClosestPeers(keys, kb.ID(make([]byte, 32)))
type report struct {
successes int
failures int
lastSuccess time.Time
mx sync.RWMutex
gammazero marked this conversation as resolved.
Show resolved Hide resolved
}

keySuccesses := make(map[peer.ID]*report, len(keys))
var numSkipped int64

for _, k := range keys {
keySuccesses[k] = &report{}
}

logger.Infof("bulk send: number of keys %d, unique %d", len(keys), len(keySuccesses))
numSuccessfulToWaitFor := int(float64(dht.bucketSize) * dht.waitFrac * 1.2)

sortedKeys := make([]peer.ID, 0, len(keySuccesses))
for k := range keySuccesses {
sortedKeys = append(sortedKeys, k)
}

sortedKeys = kb.SortClosestPeers(sortedKeys, kb.ID(make([]byte, 32)))

dht.kMapLk.RLock()
numPeers := len(dht.keyToPeerMap)
dht.kMapLk.RUnlock()

chunkSize := (len(sortedKeys) * dht.bucketSize * 2) / numPeers
if chunkSize == 0 {
chunkSize = 1
}

connmgrTag := fmt.Sprintf("dht-bulk-provide-tag-%d", rand.Int())

var numSends uint64 = 0
var numSendsSuccessful uint64 = 0
type workMessage struct {
p peer.ID
keys []peer.ID
}

workCh := make(chan workMessage, 1)
wg := sync.WaitGroup{}
onePctKeys := uint64(len(sortedKeys)) / 100
wg.Add(dht.bulkSendParallelism)
for i := 0; i < dht.bulkSendParallelism; i++ {
go func() {
defer wg.Done()
defer logger.Debugf("bulk send goroutine done")
for wmsg := range workCh {
p, workKeys := wmsg.p, wmsg.keys
dht.peerAddrsLk.RLock()
peerAddrs := dht.peerAddrs[p]
dht.peerAddrsLk.RUnlock()
dialCtx, dialCancel := context.WithTimeout(ctx, dht.timeoutPerOp)
if err := dht.h.Connect(dialCtx, peer.AddrInfo{ID: p, Addrs: peerAddrs}); err != nil {
dialCancel()
atomic.AddInt64(&numSkipped, 1)
continue
}
dialCancel()
dht.h.ConnManager().Protect(p, connmgrTag)
for _, k := range workKeys {
keyReport := keySuccesses[k]

queryTimeout := dht.timeoutPerOp
keyReport.mx.RLock()
if keyReport.successes >= numSuccessfulToWaitFor {
if time.Since(keyReport.lastSuccess) > time.Millisecond*500 {
keyReport.mx.RUnlock()
continue
}
queryTimeout = time.Millisecond * 500
}
keyReport.mx.RUnlock()

fnCtx, fnCancel := context.WithTimeout(ctx, queryTimeout)
if err := fn(fnCtx, p, k); err == nil {
keyReport.mx.Lock()
keyReport.successes++
if keyReport.successes >= numSuccessfulToWaitFor {
keyReport.lastSuccess = time.Now()
}
keyReport.mx.Unlock()
} else {
keyReport.mx.Lock()
keyReport.failures++
keyReport.mx.Unlock()
if ctx.Err() != nil {
fnCancel()
break
}
}
fnCancel()
}

bulkSendFn := func(chunk []peer.ID) {
defer wg.Done()
for _, key := range chunk {
if ctx.Err() != nil {
break
dht.h.ConnManager().Unprotect(p, connmgrTag)
}
}()
}

sendsSoFar := atomic.AddUint64(&numSends, 1)
if onePctKeys > 0 && sendsSoFar%onePctKeys == 0 {
logger.Infof("bulk sending goroutine: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(sortedKeys)), sendsSoFar, len(sortedKeys))
}
if err := fn(ctx, key); err != nil {
var l interface{}
if isProvRec {
l = internal.LoggableProviderRecordBytes(key)
} else {
l = internal.LoggableRecordKeyString(key)
keyGroups := divideByChunkSize(sortedKeys, chunkSize)
sendsSoFar := 0
for _, g := range keyGroups {
if ctx.Err() != nil {
break
}

keysPerPeer := make(map[peer.ID][]peer.ID)
for _, k := range g {
peers, err := dht.GetClosestPeers(ctx, string(k))
if err == nil {
for _, p := range peers {
keysPerPeer[p] = append(keysPerPeer[p], k)
}
logger.Infof("failed to complete bulk sending of key :%v. %v", l, err)
} else {
atomic.AddUint64(&numSendsSuccessful, 1)
}
}
}

// divide the keys into groups so that we can talk to more peers at a time, because the keys are sorted in
// XOR/Kadmelia space consecutive puts will be too the same, or nearly the same, set of peers. Working in parallel
// means less waiting on individual dials to complete and also continuing to make progress even if one segment of
// the network is being slow, or we are maxing out the connection, stream, etc. to those peers.
keyGroups := divideIntoGroups(sortedKeys, dht.bulkSendParallelism)
wg.Add(len(keyGroups))
for _, chunk := range keyGroups {
go bulkSendFn(chunk)
logger.Debugf("bulk send: %d peers for group size %d", len(keysPerPeer), len(g))

keyloop:
for p, workKeys := range keysPerPeer {
select {
case workCh <- workMessage{p: p, keys: workKeys}:
case <-ctx.Done():
break keyloop
}
}
sendsSoFar += len(g)
logger.Infof("bulk sending: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(keySuccesses)), sendsSoFar, len(keySuccesses))
}

close(workCh)

logger.Debugf("bulk send complete, waiting on goroutines to close")

wg.Wait()

numSendsSuccessful := 0
numFails := 0
// generate a histogram of how many successful sends occurred per key
successHist := make(map[int]int)
// generate a histogram of how many failed sends occurred per key
// this does not include sends to peers that were skipped and had no messages sent to them at all
failHist := make(map[int]int)
for _, v := range keySuccesses {
if v.successes > 0 {
numSendsSuccessful++
}
successHist[v.successes]++
failHist[v.failures]++
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
numFails += v.failures
}

if numSendsSuccessful == 0 {
logger.Infof("bulk send failed")
return fmt.Errorf("failed to complete bulk sending")
}

logger.Infof("bulk send complete: %d of %d successful", numSendsSuccessful, len(keys))
logger.Infof("bulk send complete: %d keys, %d unique, %d successful, %d skipped peers, %d fails",
len(keys), len(keySuccesses), numSendsSuccessful, numSkipped, numFails)

logger.Infof("bulk send summary: successHist %v, failHist %v", successHist, failHist)

return nil
}

// divideIntoGroups divides the set of keys into (at most) the number of groups
func divideIntoGroups(keys []peer.ID, groups int) [][]peer.ID {
var keyGroups [][]peer.ID
if len(keys) < groups {
for i := 0; i < len(keys); i++ {
keyGroups = append(keyGroups, keys[i:i+1])
}
return keyGroups
}

chunkSize := len(keys) / groups
remainder := len(keys) % groups

start := 0
end := chunkSize
for i := 0; i < groups; i++ {
var chunk []peer.ID
// distribute the remainder as one extra entry per parallel thread
if remainder > 0 {
chunk = keys[start : end+1]
remainder--
start = end + 1
end = end + 1 + chunkSize
} else {
chunk = keys[start:end]
start = end
end = end + chunkSize
// divideByChunkSize divides the set of keys into groups of (at most) chunkSize. Chunk size must be greater than 0.
func divideByChunkSize(keys []peer.ID, chunkSize int) [][]peer.ID {
if len(keys) == 0 {
return nil
}

if chunkSize < 1 {
panic(fmt.Sprintf("fullrt: divide into groups: invalid chunk size %d", chunkSize))
}

var keyChunks [][]peer.ID
var nextChunk []peer.ID
chunkProgress := 0
for _, k := range keys {
nextChunk = append(nextChunk, k)
chunkProgress++
if chunkProgress == chunkSize {
keyChunks = append(keyChunks, nextChunk)
chunkProgress = 0
nextChunk = make([]peer.ID, 0, len(nextChunk))
}
keyGroups = append(keyGroups, chunk)
}
return keyGroups
if chunkProgress != 0 {
keyChunks = append(keyChunks, nextChunk)
}
return keyChunks
}

// FindProviders searches until the context expires.
Expand Down
Loading