From 561e506ca26fbfda1bb1467830ae8d57c06ccf0c Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 4 Jun 2021 03:38:59 -0400 Subject: [PATCH 01/19] fullrt: do bulk sending explictly on peers --- fullrt/dht.go | 125 +++++++++++++++++++++++++------------------------- 1 file changed, 62 insertions(+), 63 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index 955892949..91a9b878d 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "math/rand" "sync" "sync/atomic" "time" @@ -918,21 +919,11 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash) return fmt.Errorf("no known addresses for self, cannot put provider") } - fn := func(ctx context.Context, k peer.ID) error { - peers, err := dht.GetClosestPeers(ctx, string(k)) - if err != nil { - return err - } - successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error { - pmes := dht_pb.NewMessage(dht_pb.Message_ADD_PROVIDER, multihash.Multihash(k), 0) - pmes.ProviderPeers = pbPeers - - return dht.messageSender.SendMessage(ctx, p, pmes) - }, peers, true) - if successes == 0 { - return fmt.Errorf("no successful provides") - } - return nil + fn := func(ctx context.Context, p, k peer.ID) error { + pmes := dht_pb.NewMessage(dht_pb.Message_ADD_PROVIDER, multihash.Multihash(k), 0) + pmes.ProviderPeers = pbPeers + + return dht.messageSender.SendMessage(ctx, p, pmes) } keysAsPeerIDs := make([]peer.ID, 0, len(keys)) @@ -963,74 +954,82 @@ func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte) return fmt.Errorf("does not support duplicate keys") } - fn := func(ctx context.Context, k peer.ID) error { - peers, err := dht.GetClosestPeers(ctx, string(k)) - if err != nil { - return err - } - successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error { - keyStr := string(k) - return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr])) - }, peers, true) - if successes == 0 { - return fmt.Errorf("no successful puts") - } - return nil + fn := func(ctx context.Context, p, k peer.ID) error { + keyStr := string(k) + return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr])) } return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn, false) } -func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, k peer.ID) error, isProvRec bool) error { +func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, target, k peer.ID) error, isProvRec bool) error { if len(keys) == 0 { return nil } - sortedKeys := kb.SortClosestPeers(keys, kb.ID(make([]byte, 32))) - - var numSends uint64 = 0 - var numSendsSuccessful uint64 = 0 + keysPerPeer := make(map[peer.ID][]peer.ID) + for _, k := range keys { + peers, err := dht.GetClosestPeers(ctx, string(k)) + if err != nil { + return err + } + for _, p := range peers { + keysPerPeer[p] = append(keysPerPeer[p], k) + } + } - wg := sync.WaitGroup{} - onePctKeys := uint64(len(sortedKeys)) / 100 + keySuccesses := make(map[peer.ID]int) - bulkSendFn := func(chunk []peer.ID) { - defer wg.Done() - for _, key := range chunk { - if ctx.Err() != nil { - break - } + connmgrTag := fmt.Sprintf("dht-bulk-provide-tag-%d", rand.Int()) - sendsSoFar := atomic.AddUint64(&numSends, 1) - if onePctKeys > 0 && sendsSoFar%onePctKeys == 0 { - logger.Infof("bulk sending goroutine: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(sortedKeys)), sendsSoFar, len(sortedKeys)) - } - if err := fn(ctx, key); err != nil { - var l interface{} - if isProvRec { - l = internal.LoggableProviderRecordBytes(key) - } else { - l = internal.LoggableRecordKeyString(key) + workCh := make(chan peer.ID) + wg := sync.WaitGroup{} + wg.Add(dht.bulkSendParallelism) + for i := 0; i < dht.bulkSendParallelism; i++ { + go func() { + defer wg.Done() + select { + case p := <-workCh: + keys := keysPerPeer[p] + for _, k := range keys { + dht.h.ConnManager().Protect(p, connmgrTag) + _ = dht.h.Connect(ctx, peer.AddrInfo{ID: p}) + if err := fn(ctx, p, k); err == nil { + keySuccesses[k]++ + } + dht.h.ConnManager().Unprotect(p, connmgrTag) } - logger.Infof("failed to complete bulk sending of key :%v. %v", l, err) - } else { - atomic.AddUint64(&numSendsSuccessful, 1) + case <-ctx.Done(): + return } - } + }() } - // divide the keys into groups so that we can talk to more peers at a time, because the keys are sorted in - // XOR/Kadmelia space consecutive puts will be too the same, or nearly the same, set of peers. Working in parallel - // means less waiting on individual dials to complete and also continuing to make progress even if one segment of - // the network is being slow, or we are maxing out the connection, stream, etc. to those peers. - keyGroups := divideIntoGroups(sortedKeys, dht.bulkSendParallelism) - wg.Add(len(keyGroups)) - for _, chunk := range keyGroups { - go bulkSendFn(chunk) + onePctPeers := len(keysPerPeer) / 100 + + peersSoFar := 0 + for p := range keysPerPeer { + peersSoFar++ + if onePctPeers > 0 && peersSoFar%onePctPeers == 0 { + logger.Infof("bulk sending goroutine: %.1f%% done - %d/%d done", 100*float64(peersSoFar)/float64(len(keysPerPeer)), peersSoFar, len(keysPerPeer)) + } + + select { + case workCh <- p: + case <-ctx.Done(): + break + } } wg.Wait() + numSendsSuccessful := 0 + for _, v := range keySuccesses { + if v > 0 { + numSendsSuccessful++ + } + } + if numSendsSuccessful == 0 { return fmt.Errorf("failed to complete bulk sending") } From f7cd545cb3522e60d2033fb1163a086581aaf971 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 4 Jun 2021 03:59:48 -0400 Subject: [PATCH 02/19] fixes --- fullrt/dht.go | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index 91a9b878d..f1bd0e1df 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -978,29 +978,44 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( } } + successMapLk := sync.Mutex{} keySuccesses := make(map[peer.ID]int) connmgrTag := fmt.Sprintf("dht-bulk-provide-tag-%d", rand.Int()) - workCh := make(chan peer.ID) + workCh := make(chan peer.ID, 1) wg := sync.WaitGroup{} wg.Add(dht.bulkSendParallelism) for i := 0; i < dht.bulkSendParallelism; i++ { go func() { defer wg.Done() - select { - case p := <-workCh: - keys := keysPerPeer[p] - for _, k := range keys { + for { + select { + case p, ok := <-workCh: + if !ok { + return + } + dht.peerAddrsLk.RLock() + peerAddrs := dht.peerAddrs[p] + dht.peerAddrsLk.RUnlock() + if err := dht.h.Connect(ctx, peer.AddrInfo{ID: p, Addrs: peerAddrs}); err != nil { + continue + } dht.h.ConnManager().Protect(p, connmgrTag) - _ = dht.h.Connect(ctx, peer.AddrInfo{ID: p}) - if err := fn(ctx, p, k); err == nil { - keySuccesses[k]++ + keys := keysPerPeer[p] + for _, k := range keys { + if err := fn(ctx, p, k); err == nil { + successMapLk.Lock() + keySuccesses[k]++ + successMapLk.Unlock() + } else if ctx.Err() != nil { + break + } } dht.h.ConnManager().Unprotect(p, connmgrTag) + case <-ctx.Done(): + return } - case <-ctx.Done(): - return } }() } @@ -1020,6 +1035,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( break } } + close(workCh) wg.Wait() From 15aff86d4021984584692357d105b633d3eeeecb Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 4 Jun 2021 12:46:07 -0400 Subject: [PATCH 03/19] new idea --- fullrt/dht.go | 84 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 23 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index f1bd0e1df..7ef21acb5 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -967,15 +967,15 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( return nil } - keysPerPeer := make(map[peer.ID][]peer.ID) - for _, k := range keys { - peers, err := dht.GetClosestPeers(ctx, string(k)) - if err != nil { - return err - } - for _, p := range peers { - keysPerPeer[p] = append(keysPerPeer[p], k) - } + sortedKeys := kb.SortClosestPeers(keys, kb.ID(make([]byte, 32))) + + dht.kMapLk.RLock() + numPeers := len(dht.keyToPeerMap) + dht.kMapLk.RUnlock() + + chunkSize := (len(sortedKeys) * dht.bucketSize) / numPeers + if chunkSize == 0 { + chunkSize = 1 } successMapLk := sync.Mutex{} @@ -983,7 +983,12 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( connmgrTag := fmt.Sprintf("dht-bulk-provide-tag-%d", rand.Int()) - workCh := make(chan peer.ID, 1) + type workMessage struct { + p peer.ID + keys []peer.ID + } + + workCh := make(chan workMessage, 1) wg := sync.WaitGroup{} wg.Add(dht.bulkSendParallelism) for i := 0; i < dht.bulkSendParallelism; i++ { @@ -991,10 +996,11 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( defer wg.Done() for { select { - case p, ok := <-workCh: + case wmsg, ok := <-workCh: if !ok { return } + p, workKeys := wmsg.p, wmsg.keys dht.peerAddrsLk.RLock() peerAddrs := dht.peerAddrs[p] dht.peerAddrsLk.RUnlock() @@ -1002,8 +1008,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( continue } dht.h.ConnManager().Protect(p, connmgrTag) - keys := keysPerPeer[p] - for _, k := range keys { + for _, k := range workKeys { if err := fn(ctx, p, k); err == nil { successMapLk.Lock() keySuccesses[k]++ @@ -1020,23 +1025,39 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( }() } - onePctPeers := len(keysPerPeer) / 100 + keyGroups := divideIntoChunks(sortedKeys, chunkSize) + sendsSoFar := 0 + for _, g := range keyGroups { + if ctx.Err() != nil { + break + } - peersSoFar := 0 - for p := range keysPerPeer { - peersSoFar++ - if onePctPeers > 0 && peersSoFar%onePctPeers == 0 { - logger.Infof("bulk sending goroutine: %.1f%% done - %d/%d done", 100*float64(peersSoFar)/float64(len(keysPerPeer)), peersSoFar, len(keysPerPeer)) + keysPerPeer := make(map[peer.ID][]peer.ID) + for _, k := range g { + peers, err := dht.GetClosestPeers(ctx, string(k)) + if err == nil { + for _, p := range peers { + keysPerPeer[p] = append(keysPerPeer[p], k) + } + } } - select { - case workCh <- p: - case <-ctx.Done(): - break + keyloop: + for p, workKeys := range keysPerPeer { + select { + case workCh <- workMessage{p: p, keys: workKeys}: + case <-ctx.Done(): + break keyloop + } } + sendsSoFar += len(g) + logger.Infof("bulk sending: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(sortedKeys)), sendsSoFar, len(sortedKeys)) } + close(workCh) + logger.Infof("bulk send complete, waiting on goroutines to close") + wg.Wait() numSendsSuccessful := 0 @@ -1047,6 +1068,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( } if numSendsSuccessful == 0 { + logger.Infof("bulk send failed") return fmt.Errorf("failed to complete bulk sending") } @@ -1055,6 +1077,22 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( return nil } +// divideIntoChunks divides the set of keys into groups of (at most) chunkSize +func divideIntoChunks(keys []peer.ID, chunkSize int) [][]peer.ID { + var keyChunks [][]peer.ID + var nextChunk []peer.ID + chunkProgress := 0 + for _, k := range keys { + nextChunk = append(nextChunk, k) + chunkProgress++ + if chunkProgress == chunkSize { + keyChunks = append(keyChunks, nextChunk) + chunkProgress = 0 + } + } + return keyChunks +} + // divideIntoGroups divides the set of keys into (at most) the number of groups func divideIntoGroups(keys []peer.ID, groups int) [][]peer.ID { var keyGroups [][]peer.ID From ea954d6cfc3bd11caffa2cce973d19b5354331b8 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 4 Jun 2021 12:52:11 -0400 Subject: [PATCH 04/19] another log --- fullrt/dht.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fullrt/dht.go b/fullrt/dht.go index 7ef21acb5..dc3d1dac1 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -1042,6 +1042,8 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( } } + logger.Infof("bulk send: %d peers for group size %d", len(keysPerPeer), len(g)) + keyloop: for p, workKeys := range keysPerPeer { select { From aff7b6fb44a36a7a6f9a61ceaf8a20e7c1c97409 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 4 Jun 2021 13:02:14 -0400 Subject: [PATCH 05/19] oops fix --- fullrt/dht.go | 1 + 1 file changed, 1 insertion(+) diff --git a/fullrt/dht.go b/fullrt/dht.go index dc3d1dac1..53d4bdb93 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -1090,6 +1090,7 @@ func divideIntoChunks(keys []peer.ID, chunkSize int) [][]peer.ID { if chunkProgress == chunkSize { keyChunks = append(keyChunks, nextChunk) chunkProgress = 0 + nextChunk = make([]peer.ID, 0, len(nextChunk)) } } return keyChunks From 125a6963b877002c01e149ebd8dd52fa47645c31 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 4 Jun 2021 19:39:29 -0400 Subject: [PATCH 06/19] bump parallelism and chunk size --- fullrt/dht.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index 53d4bdb93..ac04b3a52 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -175,7 +175,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful crawlerInterval: time.Minute * 60, - bulkSendParallelism: 10, + bulkSendParallelism: 40, } rt.wg.Add(1) @@ -973,7 +973,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( numPeers := len(dht.keyToPeerMap) dht.kMapLk.RUnlock() - chunkSize := (len(sortedKeys) * dht.bucketSize) / numPeers + chunkSize := (len(sortedKeys) * dht.bucketSize * 2) / numPeers if chunkSize == 0 { chunkSize = 1 } From dbd62400e61d0e841cdad5f25b63094d8d0faa79 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 4 Jun 2021 19:43:51 -0400 Subject: [PATCH 07/19] readd timeouts in bulk operations --- fullrt/dht.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index ac04b3a52..d061ee178 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -1004,19 +1004,26 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( dht.peerAddrsLk.RLock() peerAddrs := dht.peerAddrs[p] dht.peerAddrsLk.RUnlock() - if err := dht.h.Connect(ctx, peer.AddrInfo{ID: p, Addrs: peerAddrs}); err != nil { + dialCtx, dialCancel := context.WithTimeout(ctx, dht.timeoutPerOp) + if err := dht.h.Connect(dialCtx, peer.AddrInfo{ID: p, Addrs: peerAddrs}); err != nil { + dialCancel() continue } + dialCancel() dht.h.ConnManager().Protect(p, connmgrTag) for _, k := range workKeys { - if err := fn(ctx, p, k); err == nil { + fnCtx, fnCancel := context.WithTimeout(ctx, dht.timeoutPerOp) + if err := fn(fnCtx, p, k); err == nil { successMapLk.Lock() keySuccesses[k]++ successMapLk.Unlock() } else if ctx.Err() != nil { + fnCancel() break } + fnCancel() } + dht.h.ConnManager().Unprotect(p, connmgrTag) case <-ctx.Done(): return From eece7658022ade2eee87c741fdcadec0e54d1733 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Sat, 5 Jun 2021 22:11:57 -0400 Subject: [PATCH 08/19] more logging --- fullrt/dht.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index d061ee178..026164989 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -980,6 +980,8 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( successMapLk := sync.Mutex{} keySuccesses := make(map[peer.ID]int) + keyFailures := make(map[peer.ID]int) + numSkipped := 0 connmgrTag := fmt.Sprintf("dht-bulk-provide-tag-%d", rand.Int()) @@ -1007,6 +1009,9 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( dialCtx, dialCancel := context.WithTimeout(ctx, dht.timeoutPerOp) if err := dht.h.Connect(dialCtx, peer.AddrInfo{ID: p, Addrs: peerAddrs}); err != nil { dialCancel() + successMapLk.Lock() + numSkipped++ + successMapLk.Unlock() continue } dialCancel() @@ -1017,9 +1022,14 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( successMapLk.Lock() keySuccesses[k]++ successMapLk.Unlock() - } else if ctx.Err() != nil { - fnCancel() - break + } else { + successMapLk.Lock() + keyFailures[k]++ + successMapLk.Unlock() + if ctx.Err() != nil { + fnCancel() + break + } } fnCancel() } @@ -1076,12 +1086,20 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( } } + numFails := 0 + for _, v := range keyFailures { + if v > 0 { + numFails++ + } + } + if numSendsSuccessful == 0 { logger.Infof("bulk send failed") return fmt.Errorf("failed to complete bulk sending") } - logger.Infof("bulk send complete: %d of %d successful", numSendsSuccessful, len(keys)) + logger.Infof("bulk send complete: %d of %d successful. %d skipped peers, %d successmap, %d fails, %d failsmap", + numSendsSuccessful, len(keys), numSkipped, len(keySuccesses), numFails, len(keyFailures)) return nil } From 7346042741dab002f75e577ce5f8e3b004652169 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Sun, 6 Jun 2021 01:28:38 -0400 Subject: [PATCH 09/19] bug fix + logging --- fullrt/dht.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/fullrt/dht.go b/fullrt/dht.go index 026164989..11e4b1b44 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -969,6 +969,13 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( sortedKeys := kb.SortClosestPeers(keys, kb.ID(make([]byte, 32))) + allkeys := make(map[peer.ID]struct{}) + for _, k := range keys { + allkeys[k] = struct{}{} + } + + logger.Infof("bulk send: number of key %d, unique %d", len(keys), len(allkeys)) + dht.kMapLk.RLock() numPeers := len(dht.keyToPeerMap) dht.kMapLk.RUnlock() @@ -996,6 +1003,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( for i := 0; i < dht.bulkSendParallelism; i++ { go func() { defer wg.Done() + defer logger.Infof("bulk send goroutine done") for { select { case wmsg, ok := <-workCh: @@ -1118,6 +1126,7 @@ func divideIntoChunks(keys []peer.ID, chunkSize int) [][]peer.ID { nextChunk = make([]peer.ID, 0, len(nextChunk)) } } + keyChunks = append(keyChunks, nextChunk) return keyChunks } From 2e039d0f4050651386083aba338e8b1b21f9f7ef Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Sun, 6 Jun 2021 14:31:48 -0400 Subject: [PATCH 10/19] try again --- fullrt/dht.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index 11e4b1b44..408f53639 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -969,13 +969,6 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( sortedKeys := kb.SortClosestPeers(keys, kb.ID(make([]byte, 32))) - allkeys := make(map[peer.ID]struct{}) - for _, k := range keys { - allkeys[k] = struct{}{} - } - - logger.Infof("bulk send: number of key %d, unique %d", len(keys), len(allkeys)) - dht.kMapLk.RLock() numPeers := len(dht.keyToPeerMap) dht.kMapLk.RUnlock() @@ -985,11 +978,17 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( chunkSize = 1 } - successMapLk := sync.Mutex{} + successMapLk := sync.RWMutex{} keySuccesses := make(map[peer.ID]int) keyFailures := make(map[peer.ID]int) numSkipped := 0 + for _, k := range keys { + keySuccesses[k] = 0 + } + logger.Infof("bulk send: number of keys %d, unique %d", len(keys), len(keySuccesses)) + numSuccessfulToWaitFor := int(float64(dht.bucketSize) * dht.waitFrac) + connmgrTag := fmt.Sprintf("dht-bulk-provide-tag-%d", rand.Int()) type workMessage struct { @@ -1025,7 +1024,16 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( dialCancel() dht.h.ConnManager().Protect(p, connmgrTag) for _, k := range workKeys { - fnCtx, fnCancel := context.WithTimeout(ctx, dht.timeoutPerOp) + successMapLk.RLock() + thresholdMet := keySuccesses[k] >= numSuccessfulToWaitFor + successMapLk.RUnlock() + + queryTimeout := dht.timeoutPerOp + if thresholdMet { + queryTimeout = time.Millisecond * 500 + } + + fnCtx, fnCancel := context.WithTimeout(ctx, queryTimeout) if err := fn(fnCtx, p, k); err == nil { successMapLk.Lock() keySuccesses[k]++ @@ -1078,7 +1086,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( } } sendsSoFar += len(g) - logger.Infof("bulk sending: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(sortedKeys)), sendsSoFar, len(sortedKeys)) + logger.Infof("bulk sending: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(keySuccesses)), sendsSoFar, len(keySuccesses)) } close(workCh) From e9c67f447fdecf830292b559890a845fd316c660 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Sun, 6 Jun 2021 17:17:42 -0400 Subject: [PATCH 11/19] abort earlier --- fullrt/dht.go | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index 408f53639..2349c7804 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -967,17 +967,6 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( return nil } - sortedKeys := kb.SortClosestPeers(keys, kb.ID(make([]byte, 32))) - - dht.kMapLk.RLock() - numPeers := len(dht.keyToPeerMap) - dht.kMapLk.RUnlock() - - chunkSize := (len(sortedKeys) * dht.bucketSize * 2) / numPeers - if chunkSize == 0 { - chunkSize = 1 - } - successMapLk := sync.RWMutex{} keySuccesses := make(map[peer.ID]int) keyFailures := make(map[peer.ID]int) @@ -986,8 +975,25 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( for _, k := range keys { keySuccesses[k] = 0 } + logger.Infof("bulk send: number of keys %d, unique %d", len(keys), len(keySuccesses)) - numSuccessfulToWaitFor := int(float64(dht.bucketSize) * dht.waitFrac) + numSuccessfulToWaitFor := int(float64(dht.bucketSize) * dht.waitFrac * 1.2) + + sortedKeys := make([]peer.ID, 0, len(keySuccesses)) + for k := range keySuccesses { + sortedKeys = append(sortedKeys, k) + } + + sortedKeys = kb.SortClosestPeers(sortedKeys, kb.ID(make([]byte, 32))) + + dht.kMapLk.RLock() + numPeers := len(dht.keyToPeerMap) + dht.kMapLk.RUnlock() + + chunkSize := (len(sortedKeys) * dht.bucketSize * 2) / numPeers + if chunkSize == 0 { + chunkSize = 1 + } connmgrTag := fmt.Sprintf("dht-bulk-provide-tag-%d", rand.Int()) @@ -1030,6 +1036,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( queryTimeout := dht.timeoutPerOp if thresholdMet { + continue queryTimeout = time.Millisecond * 500 } From f7ec21e8f02247a88cc98563f38bb5dbab45a151 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Sun, 6 Jun 2021 18:17:42 -0400 Subject: [PATCH 12/19] buffer before abort --- fullrt/dht.go | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index 2349c7804..f3c91ae34 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -967,13 +967,18 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( return nil } + type report struct { + successes int + failures int + lastSuccess time.Time + } + successMapLk := sync.RWMutex{} - keySuccesses := make(map[peer.ID]int) - keyFailures := make(map[peer.ID]int) + keySuccesses := make(map[peer.ID]*report) numSkipped := 0 for _, k := range keys { - keySuccesses[k] = 0 + keySuccesses[k] = &report{} } logger.Infof("bulk send: number of keys %d, unique %d", len(keys), len(keySuccesses)) @@ -1031,23 +1036,27 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( dht.h.ConnManager().Protect(p, connmgrTag) for _, k := range workKeys { successMapLk.RLock() - thresholdMet := keySuccesses[k] >= numSuccessfulToWaitFor + keyReport := keySuccesses[k] successMapLk.RUnlock() queryTimeout := dht.timeoutPerOp - if thresholdMet { - continue + if keyReport.successes >= numSuccessfulToWaitFor { + if time.Since(keyReport.lastSuccess) > time.Millisecond*500 { + continue + } queryTimeout = time.Millisecond * 500 } fnCtx, fnCancel := context.WithTimeout(ctx, queryTimeout) if err := fn(fnCtx, p, k); err == nil { successMapLk.Lock() - keySuccesses[k]++ + s := keySuccesses[k] + s.successes++ + s.lastSuccess = time.Now() successMapLk.Unlock() } else { successMapLk.Lock() - keyFailures[k]++ + keySuccesses[k].failures++ successMapLk.Unlock() if ctx.Err() != nil { fnCancel() @@ -1103,17 +1112,12 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( wg.Wait() numSendsSuccessful := 0 + numFails := 0 for _, v := range keySuccesses { - if v > 0 { + if v.successes > 0 { numSendsSuccessful++ } - } - - numFails := 0 - for _, v := range keyFailures { - if v > 0 { - numFails++ - } + numFails += v.failures } if numSendsSuccessful == 0 { @@ -1121,8 +1125,8 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( return fmt.Errorf("failed to complete bulk sending") } - logger.Infof("bulk send complete: %d of %d successful. %d skipped peers, %d successmap, %d fails, %d failsmap", - numSendsSuccessful, len(keys), numSkipped, len(keySuccesses), numFails, len(keyFailures)) + logger.Infof("bulk send complete: %d keys, %d unique, %d successful, %d skipped peers, %d fails", + len(keys), len(keySuccesses), numSendsSuccessful, numSkipped, numFails) return nil } From 64d17df6109006a220ef39af4c2a294cc010924b Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Sun, 6 Jun 2021 18:37:20 -0400 Subject: [PATCH 13/19] fix --- fullrt/dht.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index f3c91ae34..c14eda9d5 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -1036,7 +1036,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( dht.h.ConnManager().Protect(p, connmgrTag) for _, k := range workKeys { successMapLk.RLock() - keyReport := keySuccesses[k] + keyReport := *keySuccesses[k] successMapLk.RUnlock() queryTimeout := dht.timeoutPerOp @@ -1052,7 +1052,9 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( successMapLk.Lock() s := keySuccesses[k] s.successes++ - s.lastSuccess = time.Now() + if s.successes >= numSuccessfulToWaitFor { + s.lastSuccess = time.Now() + } successMapLk.Unlock() } else { successMapLk.Lock() From ccf9493f2a7deeb6b65cf5f99e1b1c64d49c3b89 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Sun, 6 Jun 2021 21:30:06 -0400 Subject: [PATCH 14/19] histogram logging --- fullrt/dht.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/fullrt/dht.go b/fullrt/dht.go index c14eda9d5..793963459 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -1115,10 +1115,14 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( numSendsSuccessful := 0 numFails := 0 + successHist := make(map[int]int) + failHist := make(map[int]int) for _, v := range keySuccesses { if v.successes > 0 { numSendsSuccessful++ } + successHist[v.successes]++ + failHist[v.failures]++ numFails += v.failures } @@ -1130,6 +1134,8 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( logger.Infof("bulk send complete: %d keys, %d unique, %d successful, %d skipped peers, %d fails", len(keys), len(keySuccesses), numSendsSuccessful, numSkipped, numFails) + logger.Infof("bulk send summary: sucessHist %v, failHist %v", successHist, failHist) + return nil } From 63bad97d3af15baa03c8f25bae7c19fa52b87694 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Tue, 8 Jun 2021 10:30:18 -0400 Subject: [PATCH 15/19] change log levels and reduce parallelism --- fullrt/dht.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index 793963459..3e5373ead 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -175,7 +175,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful crawlerInterval: time.Minute * 60, - bulkSendParallelism: 40, + bulkSendParallelism: 20, } rt.wg.Add(1) @@ -1013,7 +1013,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( for i := 0; i < dht.bulkSendParallelism; i++ { go func() { defer wg.Done() - defer logger.Infof("bulk send goroutine done") + defer logger.Debugf("bulk send goroutine done") for { select { case wmsg, ok := <-workCh: @@ -1093,7 +1093,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( } } - logger.Infof("bulk send: %d peers for group size %d", len(keysPerPeer), len(g)) + logger.Debugf("bulk send: %d peers for group size %d", len(keysPerPeer), len(g)) keyloop: for p, workKeys := range keysPerPeer { @@ -1109,7 +1109,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( close(workCh) - logger.Infof("bulk send complete, waiting on goroutines to close") + logger.Debugf("bulk send complete, waiting on goroutines to close") wg.Wait() From 6974f15e10763edf708b27c10ec58849f7a71425 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 10 Jun 2021 14:59:03 -0400 Subject: [PATCH 16/19] move locking into key reports instead of locking the whole map --- fullrt/dht.go | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index 3e5373ead..f44ab8336 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -971,11 +971,11 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( successes int failures int lastSuccess time.Time + mx sync.RWMutex } - successMapLk := sync.RWMutex{} - keySuccesses := make(map[peer.ID]*report) - numSkipped := 0 + keySuccesses := make(map[peer.ID]*report, len(keys)) + var numSkipped int64 for _, k := range keys { keySuccesses[k] = &report{} @@ -1027,39 +1027,37 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( dialCtx, dialCancel := context.WithTimeout(ctx, dht.timeoutPerOp) if err := dht.h.Connect(dialCtx, peer.AddrInfo{ID: p, Addrs: peerAddrs}); err != nil { dialCancel() - successMapLk.Lock() - numSkipped++ - successMapLk.Unlock() + atomic.AddInt64(&numSkipped, 1) continue } dialCancel() dht.h.ConnManager().Protect(p, connmgrTag) for _, k := range workKeys { - successMapLk.RLock() - keyReport := *keySuccesses[k] - successMapLk.RUnlock() + keyReport := keySuccesses[k] queryTimeout := dht.timeoutPerOp + keyReport.mx.RLock() if keyReport.successes >= numSuccessfulToWaitFor { if time.Since(keyReport.lastSuccess) > time.Millisecond*500 { + keyReport.mx.RUnlock() continue } queryTimeout = time.Millisecond * 500 } + keyReport.mx.RUnlock() fnCtx, fnCancel := context.WithTimeout(ctx, queryTimeout) if err := fn(fnCtx, p, k); err == nil { - successMapLk.Lock() - s := keySuccesses[k] - s.successes++ - if s.successes >= numSuccessfulToWaitFor { - s.lastSuccess = time.Now() + keyReport.mx.Lock() + keyReport.successes++ + if keyReport.successes >= numSuccessfulToWaitFor { + keyReport.lastSuccess = time.Now() } - successMapLk.Unlock() + keyReport.mx.Unlock() } else { - successMapLk.Lock() - keySuccesses[k].failures++ - successMapLk.Unlock() + keyReport.mx.Lock() + keyReport.failures++ + keyReport.mx.Unlock() if ctx.Err() != nil { fnCancel() break From 32f50fe3f380d665cdd10c4d9cd9ac74cbc33d61 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 10 Jun 2021 15:05:49 -0400 Subject: [PATCH 17/19] fixed log typo and added some comments around the success and fail historgrams --- fullrt/dht.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index f44ab8336..0a190f945 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -1113,7 +1113,10 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( numSendsSuccessful := 0 numFails := 0 + // generate a histogram of how many successful sends occurred per key successHist := make(map[int]int) + // generate a histogram of how many failed sends occurred per key + // this does not include sends to peers that were skipped and had no messages sent to them at all failHist := make(map[int]int) for _, v := range keySuccesses { if v.successes > 0 { @@ -1132,7 +1135,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( logger.Infof("bulk send complete: %d keys, %d unique, %d successful, %d skipped peers, %d fails", len(keys), len(keySuccesses), numSendsSuccessful, numSkipped, numFails) - logger.Infof("bulk send summary: sucessHist %v, failHist %v", successHist, failHist) + logger.Infof("bulk send summary: successHist %v, failHist %v", successHist, failHist) return nil } From 3a57b56380298995103e2ce9ae424eb6e2d7f017 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 10 Jun 2021 15:10:26 -0400 Subject: [PATCH 18/19] switch to range loop on channel --- fullrt/dht.go | 90 +++++++++++++++++++++++---------------------------- 1 file changed, 41 insertions(+), 49 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index 0a190f945..c368bf22d 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -1014,62 +1014,54 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( go func() { defer wg.Done() defer logger.Debugf("bulk send goroutine done") - for { - select { - case wmsg, ok := <-workCh: - if !ok { - return - } - p, workKeys := wmsg.p, wmsg.keys - dht.peerAddrsLk.RLock() - peerAddrs := dht.peerAddrs[p] - dht.peerAddrsLk.RUnlock() - dialCtx, dialCancel := context.WithTimeout(ctx, dht.timeoutPerOp) - if err := dht.h.Connect(dialCtx, peer.AddrInfo{ID: p, Addrs: peerAddrs}); err != nil { - dialCancel() - atomic.AddInt64(&numSkipped, 1) - continue - } + for wmsg := range workCh { + p, workKeys := wmsg.p, wmsg.keys + dht.peerAddrsLk.RLock() + peerAddrs := dht.peerAddrs[p] + dht.peerAddrsLk.RUnlock() + dialCtx, dialCancel := context.WithTimeout(ctx, dht.timeoutPerOp) + if err := dht.h.Connect(dialCtx, peer.AddrInfo{ID: p, Addrs: peerAddrs}); err != nil { dialCancel() - dht.h.ConnManager().Protect(p, connmgrTag) - for _, k := range workKeys { - keyReport := keySuccesses[k] + atomic.AddInt64(&numSkipped, 1) + continue + } + dialCancel() + dht.h.ConnManager().Protect(p, connmgrTag) + for _, k := range workKeys { + keyReport := keySuccesses[k] + + queryTimeout := dht.timeoutPerOp + keyReport.mx.RLock() + if keyReport.successes >= numSuccessfulToWaitFor { + if time.Since(keyReport.lastSuccess) > time.Millisecond*500 { + keyReport.mx.RUnlock() + continue + } + queryTimeout = time.Millisecond * 500 + } + keyReport.mx.RUnlock() - queryTimeout := dht.timeoutPerOp - keyReport.mx.RLock() + fnCtx, fnCancel := context.WithTimeout(ctx, queryTimeout) + if err := fn(fnCtx, p, k); err == nil { + keyReport.mx.Lock() + keyReport.successes++ if keyReport.successes >= numSuccessfulToWaitFor { - if time.Since(keyReport.lastSuccess) > time.Millisecond*500 { - keyReport.mx.RUnlock() - continue - } - queryTimeout = time.Millisecond * 500 + keyReport.lastSuccess = time.Now() } - keyReport.mx.RUnlock() - - fnCtx, fnCancel := context.WithTimeout(ctx, queryTimeout) - if err := fn(fnCtx, p, k); err == nil { - keyReport.mx.Lock() - keyReport.successes++ - if keyReport.successes >= numSuccessfulToWaitFor { - keyReport.lastSuccess = time.Now() - } - keyReport.mx.Unlock() - } else { - keyReport.mx.Lock() - keyReport.failures++ - keyReport.mx.Unlock() - if ctx.Err() != nil { - fnCancel() - break - } + keyReport.mx.Unlock() + } else { + keyReport.mx.Lock() + keyReport.failures++ + keyReport.mx.Unlock() + if ctx.Err() != nil { + fnCancel() + break } - fnCancel() } - - dht.h.ConnManager().Unprotect(p, connmgrTag) - case <-ctx.Done(): - return + fnCancel() } + + dht.h.ConnManager().Unprotect(p, connmgrTag) } }() } From dae2082897fbd11fe94112c904f9ba4b5855f5bf Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 10 Jun 2021 15:49:24 -0400 Subject: [PATCH 19/19] Refactor divide keys into groups Removed divideIntoGroups Renamed divideIntoChunks into divideByChunkSize Reworked tests to target divideByChunkSize instead of divideIntoGroups Fixed divideByChunkSize to pass tests Made divideByChunkSize handle the cases of empty key lists and panic on invalid chunk sizes --- fullrt/dht.go | 51 +++++++++++++--------------------------------- fullrt/dht_test.go | 31 ++++++++++++++-------------- 2 files changed, 30 insertions(+), 52 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index c368bf22d..331f324ff 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -1066,7 +1066,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( }() } - keyGroups := divideIntoChunks(sortedKeys, chunkSize) + keyGroups := divideByChunkSize(sortedKeys, chunkSize) sendsSoFar := 0 for _, g := range keyGroups { if ctx.Err() != nil { @@ -1132,8 +1132,16 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( return nil } -// divideIntoChunks divides the set of keys into groups of (at most) chunkSize -func divideIntoChunks(keys []peer.ID, chunkSize int) [][]peer.ID { +// divideByChunkSize divides the set of keys into groups of (at most) chunkSize. Chunk size must be greater than 0. +func divideByChunkSize(keys []peer.ID, chunkSize int) [][]peer.ID { + if len(keys) == 0 { + return nil + } + + if chunkSize < 1 { + panic(fmt.Sprintf("fullrt: divide into groups: invalid chunk size %d", chunkSize)) + } + var keyChunks [][]peer.ID var nextChunk []peer.ID chunkProgress := 0 @@ -1146,41 +1154,10 @@ func divideIntoChunks(keys []peer.ID, chunkSize int) [][]peer.ID { nextChunk = make([]peer.ID, 0, len(nextChunk)) } } - keyChunks = append(keyChunks, nextChunk) - return keyChunks -} - -// divideIntoGroups divides the set of keys into (at most) the number of groups -func divideIntoGroups(keys []peer.ID, groups int) [][]peer.ID { - var keyGroups [][]peer.ID - if len(keys) < groups { - for i := 0; i < len(keys); i++ { - keyGroups = append(keyGroups, keys[i:i+1]) - } - return keyGroups - } - - chunkSize := len(keys) / groups - remainder := len(keys) % groups - - start := 0 - end := chunkSize - for i := 0; i < groups; i++ { - var chunk []peer.ID - // distribute the remainder as one extra entry per parallel thread - if remainder > 0 { - chunk = keys[start : end+1] - remainder-- - start = end + 1 - end = end + 1 + chunkSize - } else { - chunk = keys[start:end] - start = end - end = end + chunkSize - } - keyGroups = append(keyGroups, chunk) + if chunkProgress != 0 { + keyChunks = append(keyChunks, nextChunk) } - return keyGroups + return keyChunks } // FindProviders searches until the context expires. diff --git a/fullrt/dht_test.go b/fullrt/dht_test.go index 945548f43..94ee3e37a 100644 --- a/fullrt/dht_test.go +++ b/fullrt/dht_test.go @@ -7,7 +7,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -func TestDivideIntoGroups(t *testing.T) { +func TestDivideByChunkSize(t *testing.T) { var keys []peer.ID for i := 0; i < 10; i++ { keys = append(keys, peer.ID(strconv.Itoa(i))) @@ -34,7 +34,7 @@ func TestDivideIntoGroups(t *testing.T) { } t.Run("Divides", func(t *testing.T) { - gr := divideIntoGroups(keys, 2) + gr := divideByChunkSize(keys, 5) if len(gr) != 2 { t.Fatal("incorrect number of groups") } @@ -46,22 +46,25 @@ func TestDivideIntoGroups(t *testing.T) { } }) t.Run("Remainder", func(t *testing.T) { - gr := divideIntoGroups(keys, 3) - if len(gr) != 3 { + gr := divideByChunkSize(keys, 3) + if len(gr) != 4 { t.Fatal("incorrect number of groups") } - if g, expected := convertToStrings(gr[0]), []string{"0", "1", "2", "3"}; !pidsEquals(g, expected) { + if g, expected := convertToStrings(gr[0]), []string{"0", "1", "2"}; !pidsEquals(g, expected) { t.Fatalf("expected %v, got %v", expected, g) } - if g, expected := convertToStrings(gr[1]), []string{"4", "5", "6"}; !pidsEquals(g, expected) { + if g, expected := convertToStrings(gr[1]), []string{"3", "4", "5"}; !pidsEquals(g, expected) { t.Fatalf("expected %v, got %v", expected, g) } - if g, expected := convertToStrings(gr[2]), []string{"7", "8", "9"}; !pidsEquals(g, expected) { + if g, expected := convertToStrings(gr[2]), []string{"6", "7", "8"}; !pidsEquals(g, expected) { + t.Fatalf("expected %v, got %v", expected, g) + } + if g, expected := convertToStrings(gr[3]), []string{"9"}; !pidsEquals(g, expected) { t.Fatalf("expected %v, got %v", expected, g) } }) t.Run("OneEach", func(t *testing.T) { - gr := divideIntoGroups(keys, 10) + gr := divideByChunkSize(keys, 1) if len(gr) != 10 { t.Fatal("incorrect number of groups") } @@ -71,15 +74,13 @@ func TestDivideIntoGroups(t *testing.T) { } } }) - t.Run("TooManyGroups", func(t *testing.T) { - gr := divideIntoGroups(keys, 11) - if len(gr) != 10 { + t.Run("ChunkSizeLargerThanKeys", func(t *testing.T) { + gr := divideByChunkSize(keys, 11) + if len(gr) != 1 { t.Fatal("incorrect number of groups") } - for i := 0; i < 10; i++ { - if g, expected := convertToStrings(gr[i]), []string{strconv.Itoa(i)}; !pidsEquals(g, expected) { - t.Fatalf("expected %v, got %v", expected, g) - } + if g, expected := convertToStrings(gr[0]), convertToStrings(keys); !pidsEquals(g, expected) { + t.Fatalf("expected %v, got %v", expected, g) } }) }