From 45e059b466cea1e4babcd877755ca29fdef4d7b6 Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 11:46:38 -0400 Subject: [PATCH 01/40] update pool.get to return ptr --- dot/network/notifications.go | 2 +- dot/network/pool.go | 4 ++-- dot/network/service.go | 15 +++++---------- 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 4f77099891..7ebdb20252 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -296,7 +296,7 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer func (s *Service) readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDecoder) (Handshake, error) { msgBytes := s.bufPool.get() - defer s.bufPool.put(&msgBytes) + defer s.bufPool.put(msgBytes) tot, err := readStream(stream, msgBytes[:]) if err != nil { diff --git a/dot/network/pool.go b/dot/network/pool.go index 9921a8ae26..352d9a56dd 100644 --- a/dot/network/pool.go +++ b/dot/network/pool.go @@ -36,7 +36,7 @@ func newSizedBufferPool(min, max int) (bp *sizedBufferPool) { // get gets a buffer from the sizedBufferPool, or creates a new one if none are // available in the pool. Buffers have a pre-allocated capacity. -func (bp *sizedBufferPool) get() [maxMessageSize]byte { +func (bp *sizedBufferPool) get() *[maxMessageSize]byte { var buff *[maxMessageSize]byte select { case buff = <-bp.c: @@ -45,7 +45,7 @@ func (bp *sizedBufferPool) get() [maxMessageSize]byte { // create new buffer buff = &[maxMessageSize]byte{} } - return *buff + return buff } // put returns the given buffer to the sizedBufferPool. diff --git a/dot/network/service.go b/dot/network/service.go index 95c35ebca4..d047adff8f 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -465,20 +465,15 @@ func (s *Service) IsStopped() bool { // SendMessage implementation of interface to handle receiving messages func (s *Service) SendMessage(msg NotificationsMessage) { - if s.host == nil { - return - } - if s.IsStopped() { - return - } - if msg == nil { - logger.Debug("Received nil message from core service") + if s.host == nil || msg == nil || s.IsStopped() { return } + logger.Debug( - "Broadcasting message from core service", + "gossiping message", "host", s.host.id(), "type", msg.Type(), + "message", msg, ) // check if the message is part of a notifications protocol @@ -527,7 +522,7 @@ func isInbound(stream libp2pnetwork.Stream) bool { func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder, handler messageHandler) { peer := stream.Conn().RemotePeer() msgBytes := s.bufPool.get() - defer s.bufPool.put(&msgBytes) + defer s.bufPool.put(msgBytes) for { tot, err := readStream(stream, msgBytes[:]) From 5197340ea3f1ca8462f46d61f0890fa3c4f9bbe2 Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 12:04:16 -0400 Subject: [PATCH 02/40] add logs --- dot/network/pool.go | 5 ++++- dot/network/service.go | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/dot/network/pool.go b/dot/network/pool.go index 352d9a56dd..3fbf9499ed 100644 --- a/dot/network/pool.go +++ b/dot/network/pool.go @@ -40,9 +40,11 @@ func (bp *sizedBufferPool) get() *[maxMessageSize]byte { var buff *[maxMessageSize]byte select { case buff = <-bp.c: - // reuse existing buffer + // reuse existing buffer + logger.Info("using existing buffer", "chsize", len(bp.c)) default: // create new buffer + logger.Info("creating new buffer") buff = &[maxMessageSize]byte{} } return buff @@ -52,6 +54,7 @@ func (bp *sizedBufferPool) get() *[maxMessageSize]byte { func (bp *sizedBufferPool) put(b *[maxMessageSize]byte) { select { case bp.c <- b: + logger.Info("returning buffer", "chsize", len(bp.c)) default: // Discard the buffer if the pool is full. } } diff --git a/dot/network/service.go b/dot/network/service.go index d047adff8f..adf319671e 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -142,7 +142,8 @@ func NewService(cfg *Config) (*Service, error) { c: make(chan *[maxMessageSize]byte, cfg.MaxPeers*3), } } else { - bufPool = newSizedBufferPool((cfg.MaxPeers-cfg.MinPeers)*3/2, (cfg.MaxPeers+1)*3) + logger.Info("pre-allocating buffer pool") + bufPool = newSizedBufferPool(cfg.MaxPeers*3, (cfg.MaxPeers*2)*3) } network := &Service{ From 8d96671e135af80e4c876456df90ffb29859eb9f Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 13:44:28 -0400 Subject: [PATCH 03/40] reduce max message size to 63kb, no more pointers in pool --- dot/network/pool.go | 14 +++++++------- dot/network/service.go | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dot/network/pool.go b/dot/network/pool.go index 3fbf9499ed..d4d47fb7a6 100644 --- a/dot/network/pool.go +++ b/dot/network/pool.go @@ -18,15 +18,15 @@ package network // sizedBufferPool is a pool of buffers used for reading from streams type sizedBufferPool struct { - c chan *[maxMessageSize]byte + c chan [maxMessageSize]byte } func newSizedBufferPool(min, max int) (bp *sizedBufferPool) { - bufferCh := make(chan *[maxMessageSize]byte, max) + bufferCh := make(chan [maxMessageSize]byte, max) for i := 0; i < min; i++ { buf := [maxMessageSize]byte{} - bufferCh <- &buf + bufferCh <- buf } return &sizedBufferPool{ @@ -36,8 +36,8 @@ func newSizedBufferPool(min, max int) (bp *sizedBufferPool) { // get gets a buffer from the sizedBufferPool, or creates a new one if none are // available in the pool. Buffers have a pre-allocated capacity. -func (bp *sizedBufferPool) get() *[maxMessageSize]byte { - var buff *[maxMessageSize]byte +func (bp *sizedBufferPool) get() [maxMessageSize]byte { + var buff [maxMessageSize]byte select { case buff = <-bp.c: // reuse existing buffer @@ -45,13 +45,13 @@ func (bp *sizedBufferPool) get() *[maxMessageSize]byte { default: // create new buffer logger.Info("creating new buffer") - buff = &[maxMessageSize]byte{} + buff = [maxMessageSize]byte{} } return buff } // put returns the given buffer to the sizedBufferPool. -func (bp *sizedBufferPool) put(b *[maxMessageSize]byte) { +func (bp *sizedBufferPool) put(b [maxMessageSize]byte) { select { case bp.c <- b: logger.Info("returning buffer", "chsize", len(bp.c)) diff --git a/dot/network/service.go b/dot/network/service.go index adf319671e..95d999112c 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -45,7 +45,7 @@ const ( blockAnnounceID = "/block-announces/1" transactionsID = "/transactions/1" - maxMessageSize = 1024 * 1024 // 1mb for now + maxMessageSize = 1024 * 63 // 63kb for now ) var ( @@ -139,7 +139,7 @@ func NewService(cfg *Config) (*Service, error) { var bufPool *sizedBufferPool if cfg.noPreAllocate { bufPool = &sizedBufferPool{ - c: make(chan *[maxMessageSize]byte, cfg.MaxPeers*3), + c: make(chan [maxMessageSize]byte, cfg.MinPeers*3), } } else { logger.Info("pre-allocating buffer pool") From 42e2f4464c2e2653e75307aa50d76ae2c6da6f5c Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 13:59:51 -0400 Subject: [PATCH 04/40] remove bufio --- dot/network/utils.go | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/dot/network/utils.go b/dot/network/utils.go index 74935e3e47..5918f0fb35 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -17,7 +17,6 @@ package network import ( - "bufio" crand "crypto/rand" "encoding/hex" "errors" @@ -29,6 +28,8 @@ import ( "path" "path/filepath" + "github.com/ChainSafe/gossamer/lib/common" + "github.com/libp2p/go-libp2p-core/crypto" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -151,11 +152,11 @@ func uint64ToLEB128(in uint64) []byte { return out } -func readLEB128ToUint64(r *bufio.Reader) (uint64, error) { +func readLEB128ToUint64(r io.Reader) (uint64, error) { var out uint64 var shift uint for { - b, err := r.ReadByte() + b, err := common.ReadByte(r) if err != nil { return 0, err } @@ -175,13 +176,13 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { return 0, errors.New("stream is nil") } - r := bufio.NewReader(stream) + //r := bufio.NewReader(stream) var ( tot int ) - length, err := readLEB128ToUint64(r) + length, err := readLEB128ToUint64(stream) if err == io.EOF { return 0, err } else if err != nil { @@ -194,19 +195,20 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { // TODO: check if length > len(buf), if so probably log.Crit if length > maxBlockResponseSize { - logger.Warn("received message with size greater than maxBlockResponseSize, discarding", "length", length) - for { - _, err = r.Discard(int(maxBlockResponseSize)) - if err != nil { - break - } - } + logger.Warn("received message with size greater than maxBlockResponseSize, closing stream", "length", length) + _ = stream.Close() + // for { + // _, err = stream.Read(int(maxBlockResponseSize)) + // if err != nil { + // break + // } + // } return 0, fmt.Errorf("message size greater than maximum: got %d", length) } tot = 0 for i := 0; i < maxReads; i++ { - n, err := r.Read(buf[tot:]) + n, err := stream.Read(buf[tot:]) // TODO: check if length > len(buf) if err != nil { return n + tot, err } From 161406e9de1feafc83068717441aa3ae91ecafa5 Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 17:41:44 -0400 Subject: [PATCH 05/40] no more allocations --- dot/network/utils.go | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/dot/network/utils.go b/dot/network/utils.go index 5918f0fb35..f1103bf4e1 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -28,8 +28,6 @@ import ( "path" "path/filepath" - "github.com/ChainSafe/gossamer/lib/common" - "github.com/libp2p/go-libp2p-core/crypto" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -152,15 +150,20 @@ func uint64ToLEB128(in uint64) []byte { return out } -func readLEB128ToUint64(r io.Reader) (uint64, error) { +func readLEB128ToUint64(r io.Reader, buf []byte) (uint64, error) { + if len(buf) == 0 { + return 0, errors.New("buffer has length 0") + } + var out uint64 var shift uint for { - b, err := common.ReadByte(r) + _, err := r.Read(buf) if err != nil { return 0, err } + b := buf[0] out |= uint64(0x7F&b) << shift if b&0x80 == 0 { break @@ -182,7 +185,7 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { tot int ) - length, err := readLEB128ToUint64(stream) + length, err := readLEB128ToUint64(stream, buf) if err == io.EOF { return 0, err } else if err != nil { @@ -193,22 +196,21 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { return 0, nil // msg length of 0 is allowed, for example transactions handshake } - // TODO: check if length > len(buf), if so probably log.Crit + if length > uint64(len(buf)) { + logger.Warn("received message with size greater than allocated message buffer", "length", length, "buffer size", len(buf)) + _ = stream.Close() + return 0, fmt.Errorf("message size greater than allocated message buffer: got %d", length) + } + if length > maxBlockResponseSize { logger.Warn("received message with size greater than maxBlockResponseSize, closing stream", "length", length) _ = stream.Close() - // for { - // _, err = stream.Read(int(maxBlockResponseSize)) - // if err != nil { - // break - // } - // } return 0, fmt.Errorf("message size greater than maximum: got %d", length) } tot = 0 for i := 0; i < maxReads; i++ { - n, err := stream.Read(buf[tot:]) // TODO: check if length > len(buf) + n, err := stream.Read(buf[tot:]) if err != nil { return n + tot, err } From aa538dade18321e90c32618ca4468251ee5fd2f8 Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 18:31:07 -0400 Subject: [PATCH 06/40] try to fix sync?? --- dot/network/sync.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/dot/network/sync.go b/dot/network/sync.go index 22e1993ab9..483e7b73ca 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -559,16 +559,16 @@ func (q *syncQueue) processBlockRequests() { continue } - reqData, ok := q.isRequestDataCached(req.req.StartingBlock) + // reqData, ok := q.isRequestDataCached(req.req.StartingBlock) - if !ok { - q.trySync(req) - continue - } + // if !ok { + // q.trySync(req) + // continue + // } - if reqData.sent && reqData.received { - continue - } + // if reqData.sent && reqData.received { + // continue + // } q.trySync(req) case <-q.ctx.Done(): From 735c60f8077e18027ee7599c735bf25eddeb7ec9 Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 18:42:15 -0400 Subject: [PATCH 07/40] update logs --- dot/network/sync.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dot/network/sync.go b/dot/network/sync.go index 483e7b73ca..b7b9baa03c 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -404,7 +404,7 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) { start := best.Int64() + 1 req := createBlockRequest(start, 0) - logger.Debug("pushing request to queue", "start", start) + logger.Debug("pushing request to queue", "start", start, "req", req) q.requestData.Store(start, requestData{ received: false, }) @@ -582,7 +582,7 @@ func (q *syncQueue) trySync(req *syncRequest) { return } - logger.Trace("beginning to send out request", "start", req.req.StartingBlock.Value()) + logger.Debug("beginning to send out request", "start", req.req.StartingBlock.Value()) if len(req.to) != 0 { resp, err := q.syncWithPeer(req.to, req.req) if err == nil { From bcc5e6f8529c19737e8bd8a47fec4465b2498110 Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 18:54:50 -0400 Subject: [PATCH 08/40] pass buf[0] to readLEB128ToUint64 --- dot/network/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/network/utils.go b/dot/network/utils.go index f1103bf4e1..cd161b6e8b 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -185,7 +185,7 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { tot int ) - length, err := readLEB128ToUint64(stream, buf) + length, err := readLEB128ToUint64(stream, buf[0]) if err == io.EOF { return 0, err } else if err != nil { From db5ebf26f4a9346788fd78dbaa6b00b0c477fa02 Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 19:03:44 -0400 Subject: [PATCH 09/40] log --- dot/network/sync.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dot/network/sync.go b/dot/network/sync.go index b7b9baa03c..a20ad0553a 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -451,6 +451,7 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) { } func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error { + logger.Debug("got response!!", "resp", resp) if len(resp.BlockData) == 0 { return errEmptyResponseData } @@ -592,11 +593,11 @@ func (q *syncQueue) trySync(req *syncRequest) { } } - logger.Trace("failed to sync with peer", "peer", req.to, "error", err) + logger.Debug("failed to sync with peer", "peer", req.to, "error", err) q.updatePeerScore(req.to, -1) } - logger.Trace("trying peers in prioritised order...") + logger.Debug("trying peers in prioritised order...") syncPeers := q.getSortedPeers() for _, peer := range syncPeers { @@ -607,7 +608,7 @@ func (q *syncQueue) trySync(req *syncRequest) { resp, err := q.syncWithPeer(peer.pid, req.req) if err != nil { - logger.Trace("failed to sync with peer", "peer", peer.pid, "error", err) + logger.Debug("failed to sync with peer", "peer", peer.pid, "error", err) q.updatePeerScore(peer.pid, -1) continue } @@ -620,7 +621,7 @@ func (q *syncQueue) trySync(req *syncRequest) { } } - logger.Trace("failed to sync with any peer :(") + logger.Debug("failed to sync with any peer :(") if req.req.StartingBlock.IsUint64() && (req.req.RequestedData&RequestedDataHeader) == 1 { q.requestData.Store(req.req.StartingBlock.Uint64(), requestData{ sent: true, From 35200ba4137b9f7096ffbebee96085829773d246 Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 19:04:31 -0400 Subject: [PATCH 10/40] fix --- dot/network/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/network/utils.go b/dot/network/utils.go index cd161b6e8b..fa97ad1bb7 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -185,7 +185,7 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { tot int ) - length, err := readLEB128ToUint64(stream, buf[0]) + length, err := readLEB128ToUint64(stream, buf[0:]) if err == io.EOF { return 0, err } else if err != nil { From 78895dd5b6a544c24713171f65f440797c7e9f8a Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 19:10:47 -0400 Subject: [PATCH 11/40] fix --- dot/network/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/network/utils.go b/dot/network/utils.go index fa97ad1bb7..2fdf355b4c 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -185,7 +185,7 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { tot int ) - length, err := readLEB128ToUint64(stream, buf[0:]) + length, err := readLEB128ToUint64(stream, buf[:1]) if err == io.EOF { return 0, err } else if err != nil { From ef61ee8385b4816855bb3b18f7348267ddcd4de3 Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 19:30:43 -0400 Subject: [PATCH 12/40] remove connectToPeers case in discoverAndAdvertise --- dot/network/discovery.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/dot/network/discovery.go b/dot/network/discovery.go index 2226cfd2b7..bd2d3fe33f 100644 --- a/dot/network/discovery.go +++ b/dot/network/discovery.go @@ -165,19 +165,19 @@ func (d *discovery) discoverAndAdvertise() error { select { case <-d.ctx.Done(): return - case <-time.After(connectToPeersTimeout): - if len(d.h.Network().Peers()) > d.minPeers { - continue - } - - // reconnect to peers if peer count is low - for p := range peersToTry { - err = d.h.Connect(d.ctx, *p) - if err != nil { - logger.Trace("failed to connect to discovered peer", "peer", p.ID, "err", err) - delete(peersToTry, p) - } - } + // case <-time.After(connectToPeersTimeout): + // if len(d.h.Network().Peers()) > d.minPeers { + // continue + // } + + // // reconnect to peers if peer count is low + // for p := range peersToTry { + // err = d.h.Connect(d.ctx, *p) + // if err != nil { + // logger.Trace("failed to connect to discovered peer", "peer", p.ID, "err", err) + // delete(peersToTry, p) + // } + // } case peer := <-peerCh: if peer.ID == d.h.ID() || peer.ID == "" { continue From 60afa84d4302796f51c6c7b30d5a2ec2b1940728 Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 19:44:55 -0400 Subject: [PATCH 13/40] restore pool to use ptrs --- dot/network/notifications.go | 2 +- dot/network/pool.go | 17 +++++++---------- dot/network/service.go | 7 +++---- dot/network/sync.go | 17 ++++++++--------- 4 files changed, 19 insertions(+), 24 deletions(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 7ebdb20252..4f77099891 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -296,7 +296,7 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer func (s *Service) readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDecoder) (Handshake, error) { msgBytes := s.bufPool.get() - defer s.bufPool.put(msgBytes) + defer s.bufPool.put(&msgBytes) tot, err := readStream(stream, msgBytes[:]) if err != nil { diff --git a/dot/network/pool.go b/dot/network/pool.go index d4d47fb7a6..5547a474c0 100644 --- a/dot/network/pool.go +++ b/dot/network/pool.go @@ -18,15 +18,15 @@ package network // sizedBufferPool is a pool of buffers used for reading from streams type sizedBufferPool struct { - c chan [maxMessageSize]byte + c chan *[maxMessageSize]byte } func newSizedBufferPool(min, max int) (bp *sizedBufferPool) { - bufferCh := make(chan [maxMessageSize]byte, max) + bufferCh := make(chan *[maxMessageSize]byte, max) for i := 0; i < min; i++ { buf := [maxMessageSize]byte{} - bufferCh <- buf + bufferCh <- &buf } return &sizedBufferPool{ @@ -37,24 +37,21 @@ func newSizedBufferPool(min, max int) (bp *sizedBufferPool) { // get gets a buffer from the sizedBufferPool, or creates a new one if none are // available in the pool. Buffers have a pre-allocated capacity. func (bp *sizedBufferPool) get() [maxMessageSize]byte { - var buff [maxMessageSize]byte + var buff *[maxMessageSize]byte select { case buff = <-bp.c: // reuse existing buffer - logger.Info("using existing buffer", "chsize", len(bp.c)) default: // create new buffer - logger.Info("creating new buffer") - buff = [maxMessageSize]byte{} + buff = &[maxMessageSize]byte{} } - return buff + return *buff } // put returns the given buffer to the sizedBufferPool. -func (bp *sizedBufferPool) put(b [maxMessageSize]byte) { +func (bp *sizedBufferPool) put(b *[maxMessageSize]byte) { select { case bp.c <- b: - logger.Info("returning buffer", "chsize", len(bp.c)) default: // Discard the buffer if the pool is full. } } diff --git a/dot/network/service.go b/dot/network/service.go index 95d999112c..9c87bfd5a1 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -139,11 +139,10 @@ func NewService(cfg *Config) (*Service, error) { var bufPool *sizedBufferPool if cfg.noPreAllocate { bufPool = &sizedBufferPool{ - c: make(chan [maxMessageSize]byte, cfg.MinPeers*3), + c: make(chan *[maxMessageSize]byte, cfg.MinPeers*3), } } else { - logger.Info("pre-allocating buffer pool") - bufPool = newSizedBufferPool(cfg.MaxPeers*3, (cfg.MaxPeers*2)*3) + bufPool = newSizedBufferPool(cfg.MinPeers*3, cfg.MaxPeers*3) } network := &Service{ @@ -523,7 +522,7 @@ func isInbound(stream libp2pnetwork.Stream) bool { func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder, handler messageHandler) { peer := stream.Conn().RemotePeer() msgBytes := s.bufPool.get() - defer s.bufPool.put(msgBytes) + defer s.bufPool.put(&msgBytes) for { tot, err := readStream(stream, msgBytes[:]) diff --git a/dot/network/sync.go b/dot/network/sync.go index a20ad0553a..a5ba33fcbe 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -451,7 +451,6 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) { } func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error { - logger.Debug("got response!!", "resp", resp) if len(resp.BlockData) == 0 { return errEmptyResponseData } @@ -560,16 +559,16 @@ func (q *syncQueue) processBlockRequests() { continue } - // reqData, ok := q.isRequestDataCached(req.req.StartingBlock) + reqData, ok := q.isRequestDataCached(req.req.StartingBlock) - // if !ok { - // q.trySync(req) - // continue - // } + if !ok { + q.trySync(req) + continue + } - // if reqData.sent && reqData.received { - // continue - // } + if reqData.sent && reqData.received { + continue + } q.trySync(req) case <-q.ctx.Done(): From 8e70013abcb7ac8b963299f3154bb9dbc9d76994 Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 19:47:17 -0400 Subject: [PATCH 14/40] remove storing peersToTry --- dot/network/discovery.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dot/network/discovery.go b/dot/network/discovery.go index bd2d3fe33f..36771233f6 100644 --- a/dot/network/discovery.go +++ b/dot/network/discovery.go @@ -192,8 +192,8 @@ func (d *discovery) discoverAndAdvertise() error { logger.Trace("failed to connect to discovered peer", "peer", peer.ID, "err", err) } } else { - d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL) - peersToTry[&peer] = struct{}{} + // d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL) + // peersToTry[&peer] = struct{}{} } } } From 4af4f955230fc77ea742fafeb8cd36eb180e635b Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 19:49:39 -0400 Subject: [PATCH 15/40] fix --- dot/network/discovery.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dot/network/discovery.go b/dot/network/discovery.go index 36771233f6..543e2445f8 100644 --- a/dot/network/discovery.go +++ b/dot/network/discovery.go @@ -24,7 +24,7 @@ import ( badger "github.com/ipfs/go-ds-badger2" libp2phost "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/peerstore" + //"github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/protocol" libp2pdiscovery "github.com/libp2p/go-libp2p-discovery" kaddht "github.com/libp2p/go-libp2p-kad-dht" @@ -159,7 +159,7 @@ func (d *discovery) discoverAndAdvertise() error { return } - peersToTry := make(map[*peer.AddrInfo]struct{}) + //peersToTry := make(map[*peer.AddrInfo]struct{}) for { select { From fb467b1ac45a7cd45db48db39afc62da38d6c369 Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 20:04:57 -0400 Subject: [PATCH 16/40] attempt to readd connecting to min peers --- dot/network/discovery.go | 40 +++++++++++++++++++++++++--------------- dot/network/pool.go | 2 +- dot/network/sync.go | 12 ++++++------ 3 files changed, 32 insertions(+), 22 deletions(-) diff --git a/dot/network/discovery.go b/dot/network/discovery.go index 543e2445f8..5adf9a6646 100644 --- a/dot/network/discovery.go +++ b/dot/network/discovery.go @@ -150,6 +150,8 @@ func (d *discovery) discoverAndAdvertise() error { } } }() + + peersToTry := make(map[*peer.AddrInfo]struct{}) go func() { logger.Debug("attempting to find DHT peers...") @@ -159,25 +161,11 @@ func (d *discovery) discoverAndAdvertise() error { return } - //peersToTry := make(map[*peer.AddrInfo]struct{}) for { select { case <-d.ctx.Done(): return - // case <-time.After(connectToPeersTimeout): - // if len(d.h.Network().Peers()) > d.minPeers { - // continue - // } - - // // reconnect to peers if peer count is low - // for p := range peersToTry { - // err = d.h.Connect(d.ctx, *p) - // if err != nil { - // logger.Trace("failed to connect to discovered peer", "peer", p.ID, "err", err) - // delete(peersToTry, p) - // } - // } case peer := <-peerCh: if peer.ID == d.h.ID() || peer.ID == "" { continue @@ -193,12 +181,34 @@ func (d *discovery) discoverAndAdvertise() error { } } else { // d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL) - // peersToTry[&peer] = struct{}{} + peersToTry[&peer] = struct{}{} } } } }() + go func() { + for { + select { + case <-d.ctx.Done(): + return + case <-time.After(connectToPeersTimeout): + if len(d.h.Network().Peers()) > d.minPeers { + continue + } + + // reconnect to peers if peer count is low + for p := range peersToTry { + err = d.h.Connect(d.ctx, *p) + if err != nil { + logger.Trace("failed to connect to discovered peer", "peer", p.ID, "err", err) + delete(peersToTry, p) + } + } + } + } + }() + logger.Debug("DHT discovery started!") return nil } diff --git a/dot/network/pool.go b/dot/network/pool.go index 5547a474c0..9921a8ae26 100644 --- a/dot/network/pool.go +++ b/dot/network/pool.go @@ -40,7 +40,7 @@ func (bp *sizedBufferPool) get() [maxMessageSize]byte { var buff *[maxMessageSize]byte select { case buff = <-bp.c: - // reuse existing buffer + // reuse existing buffer default: // create new buffer buff = &[maxMessageSize]byte{} diff --git a/dot/network/sync.go b/dot/network/sync.go index a5ba33fcbe..22e1993ab9 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -404,7 +404,7 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) { start := best.Int64() + 1 req := createBlockRequest(start, 0) - logger.Debug("pushing request to queue", "start", start, "req", req) + logger.Debug("pushing request to queue", "start", start) q.requestData.Store(start, requestData{ received: false, }) @@ -582,7 +582,7 @@ func (q *syncQueue) trySync(req *syncRequest) { return } - logger.Debug("beginning to send out request", "start", req.req.StartingBlock.Value()) + logger.Trace("beginning to send out request", "start", req.req.StartingBlock.Value()) if len(req.to) != 0 { resp, err := q.syncWithPeer(req.to, req.req) if err == nil { @@ -592,11 +592,11 @@ func (q *syncQueue) trySync(req *syncRequest) { } } - logger.Debug("failed to sync with peer", "peer", req.to, "error", err) + logger.Trace("failed to sync with peer", "peer", req.to, "error", err) q.updatePeerScore(req.to, -1) } - logger.Debug("trying peers in prioritised order...") + logger.Trace("trying peers in prioritised order...") syncPeers := q.getSortedPeers() for _, peer := range syncPeers { @@ -607,7 +607,7 @@ func (q *syncQueue) trySync(req *syncRequest) { resp, err := q.syncWithPeer(peer.pid, req.req) if err != nil { - logger.Debug("failed to sync with peer", "peer", peer.pid, "error", err) + logger.Trace("failed to sync with peer", "peer", peer.pid, "error", err) q.updatePeerScore(peer.pid, -1) continue } @@ -620,7 +620,7 @@ func (q *syncQueue) trySync(req *syncRequest) { } } - logger.Debug("failed to sync with any peer :(") + logger.Trace("failed to sync with any peer :(") if req.req.StartingBlock.IsUint64() && (req.req.RequestedData&RequestedDataHeader) == 1 { q.requestData.Store(req.req.StartingBlock.Uint64(), requestData{ sent: true, From 56c1aaaf69a6d72f83913d5ca29136b7f7d769f2 Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 21:08:34 -0400 Subject: [PATCH 17/40] increase connectToPeersTimeout --- dot/network/discovery.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/dot/network/discovery.go b/dot/network/discovery.go index 5adf9a6646..733d01f9af 100644 --- a/dot/network/discovery.go +++ b/dot/network/discovery.go @@ -24,7 +24,7 @@ import ( badger "github.com/ipfs/go-ds-badger2" libp2phost "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" - //"github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/protocol" libp2pdiscovery "github.com/libp2p/go-libp2p-discovery" kaddht "github.com/libp2p/go-libp2p-kad-dht" @@ -35,7 +35,7 @@ var ( startDHTTimeout = time.Second * 10 initialAdvertisementTimeout = time.Millisecond tryAdvertiseTimeout = time.Second * 30 - connectToPeersTimeout = time.Minute + connectToPeersTimeout = time.Minute * 5 ) // discovery handles discovery of new peers via the kademlia DHT @@ -134,7 +134,7 @@ func (d *discovery) discoverAndAdvertise() error { select { case <-time.After(ttl): logger.Debug("advertising ourselves in the DHT...") - err := d.dht.Bootstrap(d.ctx) + err := d.dht.Bootstrap(d.ctx) //nolint if err != nil { logger.Warn("failed to bootstrap DHT", "error", err) continue @@ -150,18 +150,17 @@ func (d *discovery) discoverAndAdvertise() error { } } }() - + peersToTry := make(map[*peer.AddrInfo]struct{}) go func() { logger.Debug("attempting to find DHT peers...") - peerCh, err := rd.FindPeers(d.ctx, string(d.pid)) + peerCh, err := rd.FindPeers(d.ctx, string(d.pid)) //nolint if err != nil { logger.Warn("failed to begin finding peers via DHT", "err", err) return } - for { select { case <-d.ctx.Done(): @@ -180,7 +179,7 @@ func (d *discovery) discoverAndAdvertise() error { logger.Trace("failed to connect to discovered peer", "peer", peer.ID, "err", err) } } else { - // d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL) + d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL) peersToTry[&peer] = struct{}{} } } @@ -205,7 +204,7 @@ func (d *discovery) discoverAndAdvertise() error { delete(peersToTry, p) } } - } + } } }() From aa58de6339ed4b2f20b8e76b371faf87d7de611b Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 21:18:01 -0400 Subject: [PATCH 18/40] split up discoverAndAdvertise funcs --- dot/network/discovery.go | 153 ++++++++++++++++++++------------------- 1 file changed, 78 insertions(+), 75 deletions(-) diff --git a/dot/network/discovery.go b/dot/network/discovery.go index 733d01f9af..f73bc225eb 100644 --- a/dot/network/discovery.go +++ b/dot/network/discovery.go @@ -42,22 +42,25 @@ var ( type discovery struct { ctx context.Context dht *dual.DHT + rd *libp2pdiscovery.RoutingDiscovery h libp2phost.Host bootnodes []peer.AddrInfo ds *badger.Datastore pid protocol.ID minPeers, maxPeers int + cachedPeers map[*peer.AddrInfo]struct{} } func newDiscovery(ctx context.Context, h libp2phost.Host, bootnodes []peer.AddrInfo, ds *badger.Datastore, pid protocol.ID, min, max int) *discovery { return &discovery{ - ctx: ctx, - h: h, - bootnodes: bootnodes, - ds: ds, - pid: pid, - minPeers: min, - maxPeers: max, + ctx: ctx, + h: h, + bootnodes: bootnodes, + ds: ds, + pid: pid, + minPeers: min, + maxPeers: max, + cachedPeers: make(map[*peer.AddrInfo]struct{}), } } @@ -117,7 +120,7 @@ func (d *discovery) stop() error { } func (d *discovery) discoverAndAdvertise() error { - rd := libp2pdiscovery.NewRoutingDiscovery(d.dht) + d.rd = libp2pdiscovery.NewRoutingDiscovery(d.dht) err := d.dht.Bootstrap(d.ctx) if err != nil { @@ -126,88 +129,88 @@ func (d *discovery) discoverAndAdvertise() error { // wait to connect to bootstrap peers time.Sleep(time.Second) + go d.advertise() + go d.findPeers() + go d.checkPeerCount() + logger.Debug("DHT discovery started!") + return nil +} - go func() { - ttl := initialAdvertisementTimeout - - for { - select { - case <-time.After(ttl): - logger.Debug("advertising ourselves in the DHT...") - err := d.dht.Bootstrap(d.ctx) //nolint - if err != nil { - logger.Warn("failed to bootstrap DHT", "error", err) - continue - } +func (d *discovery) advertise() { + ttl := initialAdvertisementTimeout + + for { + select { + case <-time.After(ttl): + logger.Debug("advertising ourselves in the DHT...") + err := d.dht.Bootstrap(d.ctx) //nolint + if err != nil { + logger.Warn("failed to bootstrap DHT", "error", err) + continue + } - ttl, err = rd.Advertise(d.ctx, string(d.pid)) - if err != nil { - logger.Debug("failed to advertise in the DHT", "error", err) - ttl = tryAdvertiseTimeout - } - case <-d.ctx.Done(): - return + ttl, err = d.rd.Advertise(d.ctx, string(d.pid)) + if err != nil { + logger.Debug("failed to advertise in the DHT", "error", err) + ttl = tryAdvertiseTimeout } + case <-d.ctx.Done(): + return } - }() + } +} - peersToTry := make(map[*peer.AddrInfo]struct{}) +func (d *discovery) findPeers() { + logger.Debug("attempting to find DHT peers...") + peerCh, err := d.rd.FindPeers(d.ctx, string(d.pid)) //nolint + if err != nil { + logger.Warn("failed to begin finding peers via DHT", "err", err) + return + } - go func() { - logger.Debug("attempting to find DHT peers...") - peerCh, err := rd.FindPeers(d.ctx, string(d.pid)) //nolint - if err != nil { - logger.Warn("failed to begin finding peers via DHT", "err", err) + for { + select { + case <-d.ctx.Done(): return - } + case peer := <-peerCh: + if peer.ID == d.h.ID() || peer.ID == "" { + continue + } - for { - select { - case <-d.ctx.Done(): - return - case peer := <-peerCh: - if peer.ID == d.h.ID() || peer.ID == "" { - continue - } + logger.Trace("found new peer via DHT", "peer", peer.ID) - logger.Trace("found new peer via DHT", "peer", peer.ID) - - // found a peer, try to connect if we need more peers - if len(d.h.Network().Peers()) < d.maxPeers { - err = d.h.Connect(d.ctx, peer) - if err != nil { - logger.Trace("failed to connect to discovered peer", "peer", peer.ID, "err", err) - } - } else { - d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL) - peersToTry[&peer] = struct{}{} + // found a peer, try to connect if we need more peers + if len(d.h.Network().Peers()) < d.maxPeers { + err = d.h.Connect(d.ctx, peer) + if err != nil { + logger.Trace("failed to connect to discovered peer", "peer", peer.ID, "err", err) } + } else { + d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL) + d.cachedPeers[&peer] = struct{}{} } } - }() + } +} - go func() { - for { - select { - case <-d.ctx.Done(): - return - case <-time.After(connectToPeersTimeout): - if len(d.h.Network().Peers()) > d.minPeers { - continue - } +func (d *discovery) checkPeerCount() { + for { + select { + case <-d.ctx.Done(): + return + case <-time.After(connectToPeersTimeout): + if len(d.h.Network().Peers()) > d.minPeers { + continue + } - // reconnect to peers if peer count is low - for p := range peersToTry { - err = d.h.Connect(d.ctx, *p) - if err != nil { - logger.Trace("failed to connect to discovered peer", "peer", p.ID, "err", err) - delete(peersToTry, p) - } + // reconnect to peers if peer count is low + for p := range d.cachedPeers { + err := d.h.Connect(d.ctx, *p) + if err != nil { + logger.Trace("failed to connect to discovered peer", "peer", p.ID, "err", err) + delete(d.cachedPeers, p) } } } - }() - - logger.Debug("DHT discovery started!") - return nil + } } From e1563f354b70b227ee4f65e9b0b16db49d98d47d Mon Sep 17 00:00:00 2001 From: noot Date: Thu, 27 May 2021 21:49:49 -0400 Subject: [PATCH 19/40] don't find peers --- dot/network/discovery.go | 2 +- dot/network/utils.go | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/dot/network/discovery.go b/dot/network/discovery.go index f73bc225eb..5d547b5d96 100644 --- a/dot/network/discovery.go +++ b/dot/network/discovery.go @@ -130,7 +130,7 @@ func (d *discovery) discoverAndAdvertise() error { // wait to connect to bootstrap peers time.Sleep(time.Second) go d.advertise() - go d.findPeers() + //go d.findPeers() go d.checkPeerCount() logger.Debug("DHT discovery started!") return nil diff --git a/dot/network/utils.go b/dot/network/utils.go index 2fdf355b4c..771cabda08 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -179,8 +179,6 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { return 0, errors.New("stream is nil") } - //r := bufio.NewReader(stream) - var ( tot int ) From c8b55443f383aec48a436da2e28e6693b90ff9b9 Mon Sep 17 00:00:00 2001 From: noot Date: Fri, 28 May 2021 11:39:16 -0400 Subject: [PATCH 20/40] only find peers if below min --- dot/network/discovery.go | 66 ++++++++++++++++++---------------------- 1 file changed, 30 insertions(+), 36 deletions(-) diff --git a/dot/network/discovery.go b/dot/network/discovery.go index 5d547b5d96..32bf6cc530 100644 --- a/dot/network/discovery.go +++ b/dot/network/discovery.go @@ -36,6 +36,7 @@ var ( initialAdvertisementTimeout = time.Millisecond tryAdvertiseTimeout = time.Second * 30 connectToPeersTimeout = time.Minute * 5 + findPeersTimeout = time.Minute ) // discovery handles discovery of new peers via the kademlia DHT @@ -48,19 +49,17 @@ type discovery struct { ds *badger.Datastore pid protocol.ID minPeers, maxPeers int - cachedPeers map[*peer.AddrInfo]struct{} } func newDiscovery(ctx context.Context, h libp2phost.Host, bootnodes []peer.AddrInfo, ds *badger.Datastore, pid protocol.ID, min, max int) *discovery { return &discovery{ - ctx: ctx, - h: h, - bootnodes: bootnodes, - ds: ds, - pid: pid, - minPeers: min, - maxPeers: max, - cachedPeers: make(map[*peer.AddrInfo]struct{}), + ctx: ctx, + h: h, + bootnodes: bootnodes, + ds: ds, + pid: pid, + minPeers: min, + maxPeers: max, } } @@ -143,7 +142,7 @@ func (d *discovery) advertise() { select { case <-time.After(ttl): logger.Debug("advertising ourselves in the DHT...") - err := d.dht.Bootstrap(d.ctx) //nolint + err := d.dht.Bootstrap(d.ctx) if err != nil { logger.Warn("failed to bootstrap DHT", "error", err) continue @@ -160,9 +159,26 @@ func (d *discovery) advertise() { } } -func (d *discovery) findPeers() { +func (d *discovery) checkPeerCount() { + for { + select { + case <-d.ctx.Done(): + return + case <-time.After(connectToPeersTimeout): + if len(d.h.Network().Peers()) > d.minPeers { + continue + } + + ctx, cancel := context.WithTimeout(d.ctx, findPeersTimeout) + defer cancel() + d.findPeers(ctx) + } + } +} + +func (d *discovery) findPeers(ctx context.Context) { logger.Debug("attempting to find DHT peers...") - peerCh, err := d.rd.FindPeers(d.ctx, string(d.pid)) //nolint + peerCh, err := d.rd.FindPeers(d.ctx, string(d.pid)) if err != nil { logger.Warn("failed to begin finding peers via DHT", "err", err) return @@ -170,7 +186,7 @@ func (d *discovery) findPeers() { for { select { - case <-d.ctx.Done(): + case <-ctx.Done(): return case peer := <-peerCh: if peer.ID == d.h.ID() || peer.ID == "" { @@ -187,29 +203,7 @@ func (d *discovery) findPeers() { } } else { d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL) - d.cachedPeers[&peer] = struct{}{} - } - } - } -} - -func (d *discovery) checkPeerCount() { - for { - select { - case <-d.ctx.Done(): - return - case <-time.After(connectToPeersTimeout): - if len(d.h.Network().Peers()) > d.minPeers { - continue - } - - // reconnect to peers if peer count is low - for p := range d.cachedPeers { - err := d.h.Connect(d.ctx, *p) - if err != nil { - logger.Trace("failed to connect to discovered peer", "peer", p.ID, "err", err) - delete(d.cachedPeers, p) - } + return } } } From 0ef575d2379a6b5d71a30d19d0e967a790a36037 Mon Sep 17 00:00:00 2001 From: noot Date: Fri, 28 May 2021 12:11:01 -0400 Subject: [PATCH 21/40] lint --- dot/network/discovery.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/network/discovery.go b/dot/network/discovery.go index 32bf6cc530..d4773c9c5e 100644 --- a/dot/network/discovery.go +++ b/dot/network/discovery.go @@ -129,8 +129,8 @@ func (d *discovery) discoverAndAdvertise() error { // wait to connect to bootstrap peers time.Sleep(time.Second) go d.advertise() - //go d.findPeers() go d.checkPeerCount() + logger.Debug("DHT discovery started!") return nil } From 5565d903f96755a2036c01355c41b584a9f93941 Mon Sep 17 00:00:00 2001 From: noot Date: Sat, 29 May 2021 11:32:43 -0400 Subject: [PATCH 22/40] add go.mod --- go.mod | 1 - 1 file changed, 1 deletion(-) diff --git a/go.mod b/go.mod index 30a8b0fb26..84bfa0a2dd 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,6 @@ require ( github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 github.com/perlin-network/life v0.0.0-20191203030451-05c0e0f7eaea github.com/rs/cors v1.7.0 // indirect - github.com/sirupsen/logrus v1.6.0 github.com/stretchr/testify v1.7.0 github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca // indirect github.com/urfave/cli v1.20.0 From a1babfa1f63e645a249d3b2e964350436c2e8ce4 Mon Sep 17 00:00:00 2001 From: noot Date: Sun, 30 May 2021 18:49:58 -0400 Subject: [PATCH 23/40] add timeout to readStream --- dot/network/utils.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/dot/network/utils.go b/dot/network/utils.go index 771cabda08..13748f2140 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -34,6 +34,8 @@ import ( "github.com/multiformats/go-multiaddr" ) +var receiveStreamDataTimeout = time.Second * 15 + // stringToAddrInfos converts a single string peer id to AddrInfo func stringToAddrInfo(s string) (peer.AddrInfo, error) { maddr, err := multiaddr.NewMultiaddr(s) @@ -183,11 +185,19 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { tot int ) - length, err := readLEB128ToUint64(stream, buf[:1]) - if err == io.EOF { - return 0, err - } else if err != nil { - return 0, err // TODO: return bytes read from readLEB128ToUint64 + receivedDataCh := make(chan struct{}) + go func() { + length, err := readLEB128ToUint64(stream, buf[:1]) + if err != nil { + return 0, err // TODO: return bytes read from readLEB128ToUint64 + } + close(receivedDataCh) + }() + + select { + case <-time.After(receiveStreamDataTimeout): + return 0, errors.New("timeout") + case <-receivedDataCh: } if length == 0 { From a9fab019d244522ff9ace633de11c1ff029aeee2 Mon Sep 17 00:00:00 2001 From: noot Date: Sun, 30 May 2021 18:50:38 -0400 Subject: [PATCH 24/40] cleanup --- dot/network/utils.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/dot/network/utils.go b/dot/network/utils.go index 13748f2140..b49cca9b8f 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -206,13 +206,11 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { if length > uint64(len(buf)) { logger.Warn("received message with size greater than allocated message buffer", "length", length, "buffer size", len(buf)) - _ = stream.Close() return 0, fmt.Errorf("message size greater than allocated message buffer: got %d", length) } if length > maxBlockResponseSize { logger.Warn("received message with size greater than maxBlockResponseSize, closing stream", "length", length) - _ = stream.Close() return 0, fmt.Errorf("message size greater than maximum: got %d", length) } From 80569f59b4a4a138f76486265d7fc5d2e592eeae Mon Sep 17 00:00:00 2001 From: noot Date: Sun, 30 May 2021 18:52:44 -0400 Subject: [PATCH 25/40] fix --- dot/network/utils.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dot/network/utils.go b/dot/network/utils.go index b49cca9b8f..24ff1323f5 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -27,6 +27,7 @@ import ( "os" "path" "path/filepath" + "time" "github.com/libp2p/go-libp2p-core/crypto" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" @@ -182,15 +183,14 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { } var ( - tot int + tot int + length uint64 + err error ) receivedDataCh := make(chan struct{}) go func() { - length, err := readLEB128ToUint64(stream, buf[:1]) - if err != nil { - return 0, err // TODO: return bytes read from readLEB128ToUint64 - } + length, err = readLEB128ToUint64(stream, buf[:1]) // TODO: return bytes read from readLEB128ToUint64 if error close(receivedDataCh) }() From f3926954f05bffbf26cf36de0ea48565af17b293 Mon Sep 17 00:00:00 2001 From: noot Date: Sun, 30 May 2021 18:54:00 -0400 Subject: [PATCH 26/40] fix --- dot/network/utils.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dot/network/utils.go b/dot/network/utils.go index 24ff1323f5..c8606e2543 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -200,6 +200,10 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { case <-receivedDataCh: } + if err != nil { + return err + } + if length == 0 { return 0, nil // msg length of 0 is allowed, for example transactions handshake } From a6ef2aca5ac024e013afdf8befb2b402e53ff432 Mon Sep 17 00:00:00 2001 From: noot Date: Sun, 30 May 2021 18:54:34 -0400 Subject: [PATCH 27/40] fix again --- dot/network/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/network/utils.go b/dot/network/utils.go index c8606e2543..b880ec0730 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -201,7 +201,7 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { } if err != nil { - return err + return 0, err } if length == 0 { From da6d0b475e1fb8fc9ede8a185785c9dd6b6a88d3 Mon Sep 17 00:00:00 2001 From: noot Date: Sun, 30 May 2021 19:33:57 -0400 Subject: [PATCH 28/40] always close sync stream after handling, change timeout --- dot/network/sync.go | 8 ++++---- dot/network/utils.go | 8 +++++++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/dot/network/sync.go b/dot/network/sync.go index 22e1993ab9..0451f9b859 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -58,12 +58,12 @@ func (s *Service) handleSyncMessage(stream libp2pnetwork.Stream, msg Message) er return nil } + defer func() { + _ = stream.Close() + }() + // if it's a BlockRequest, call core for processing if req, ok := msg.(*BlockRequestMessage); ok { - defer func() { - _ = stream.Close() - }() - resp, err := s.syncer.CreateBlockResponse(req) if err != nil { logger.Debug("cannot create response for request", "error", err) diff --git a/dot/network/utils.go b/dot/network/utils.go index b880ec0730..c904f1a95f 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -189,13 +189,19 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { ) receivedDataCh := make(chan struct{}) + timeoutCh := make(chan struct{}) go func() { length, err = readLEB128ToUint64(stream, buf[:1]) // TODO: return bytes read from readLEB128ToUint64 if error close(receivedDataCh) }() + go func() { + time.Sleep(receiveStreamDataTimeout) + close(timeoutCh) + }() + select { - case <-time.After(receiveStreamDataTimeout): + case <-timeoutCh: return 0, errors.New("timeout") case <-receivedDataCh: } From 2162ebcb85f88a036e9bb0871345b00d1c029a8e Mon Sep 17 00:00:00 2001 From: noot Date: Sun, 30 May 2021 19:57:17 -0400 Subject: [PATCH 29/40] remove timeouts, add streamManager --- dot/network/service.go | 18 ++++++---- dot/network/stream_manager.go | 62 +++++++++++++++++++++++++++++++++++ dot/network/sync.go | 8 ++--- dot/network/utils.go | 28 ++-------------- 4 files changed, 81 insertions(+), 35 deletions(-) create mode 100644 dot/network/stream_manager.go diff --git a/dot/network/service.go b/dot/network/service.go index effb2b7506..96e6a72d96 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -67,12 +67,13 @@ type Service struct { ctx context.Context cancel context.CancelFunc - cfg *Config - host *host - mdns *mdns - gossip *gossip - syncQueue *syncQueue - bufPool *sizedBufferPool + cfg *Config + host *host + mdns *mdns + gossip *gossip + syncQueue *syncQueue + bufPool *sizedBufferPool + streamManager *streamManager notificationsProtocols map[byte]*notificationsProtocol // map of sub-protocol msg ID to protocol info notificationsMu sync.RWMutex @@ -162,6 +163,7 @@ func NewService(cfg *Config) (*Service, error) { telemetryInterval: cfg.telemetryInterval, closeCh: make(chan interface{}), bufPool: bufPool, + streamManager: newStreamManager(ctx), } network.syncQueue = newSyncQueue(network) @@ -529,6 +531,8 @@ func isInbound(stream libp2pnetwork.Stream) bool { } func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder, handler messageHandler) { + s.streamManager.logNewStream(stream) + peer := stream.Conn().RemotePeer() msgBytes := s.bufPool.get() defer s.bufPool.put(&msgBytes) @@ -543,6 +547,8 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder return } + s.streamManager.logMessageReceived(stream.ID()) + // decode message based on message type msg, err := decoder(msgBytes[:tot], peer, isInbound(stream)) if err != nil { diff --git a/dot/network/stream_manager.go b/dot/network/stream_manager.go new file mode 100644 index 0000000000..6c442a23af --- /dev/null +++ b/dot/network/stream_manager.go @@ -0,0 +1,62 @@ +package network + +import ( + "context" + "time" + + "github.com/libp2p/go-libp2p-core/network" +) + +var cleanupStreamInterval = time.Minute + +type streamManager struct { + ctx context.Context + lastReceivedMessage map[string]time.Time + streams map[string]network.Stream +} + +func newStreamManager(ctx context.Context) *streamManager { + return &streamManager{ + ctx: ctx, + lastReceivedMessage: make(map[string]time.Time), + streams: make(map[string]network.Stream), + } +} + +func (sm *streamManager) start() { + go func() { + for { + select { + case <-sm.ctx.Done(): + return + case <-time.After(cleanupStreamInterval): + sm.cleanupStreams() + } + } + }() +} + +func (sm *streamManager) cleanupStreams() { + for id, stream := range sm.streams { + lastReceived, has := sm.lastReceivedMessage[id] + if !has { + _ = stream.Close() + delete(sm.streams, id) + } + + if time.Since(lastReceived) > cleanupStreamInterval { + _ = stream.Close() + delete(sm.streams, id) + delete(sm.lastReceivedMessage, id) + } + } +} + +func (sm *streamManager) logNewStream(stream network.Stream) { + sm.lastReceivedMessage[stream.ID()] = time.Now() // prevents closing just opened streams, in case the cleanup goroutine runs at the same time stream is opened + sm.streams[stream.ID()] = stream +} + +func (sm *streamManager) logMessageReceived(streamID string) { + sm.lastReceivedMessage[streamID] = time.Now() +} diff --git a/dot/network/sync.go b/dot/network/sync.go index 0451f9b859..22e1993ab9 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -58,12 +58,12 @@ func (s *Service) handleSyncMessage(stream libp2pnetwork.Stream, msg Message) er return nil } - defer func() { - _ = stream.Close() - }() - // if it's a BlockRequest, call core for processing if req, ok := msg.(*BlockRequestMessage); ok { + defer func() { + _ = stream.Close() + }() + resp, err := s.syncer.CreateBlockResponse(req) if err != nil { logger.Debug("cannot create response for request", "error", err) diff --git a/dot/network/utils.go b/dot/network/utils.go index c904f1a95f..211f247a8f 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -27,7 +27,6 @@ import ( "os" "path" "path/filepath" - "time" "github.com/libp2p/go-libp2p-core/crypto" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" @@ -35,8 +34,6 @@ import ( "github.com/multiformats/go-multiaddr" ) -var receiveStreamDataTimeout = time.Second * 15 - // stringToAddrInfos converts a single string peer id to AddrInfo func stringToAddrInfo(s string) (peer.AddrInfo, error) { maddr, err := multiaddr.NewMultiaddr(s) @@ -183,31 +180,12 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { } var ( - tot int - length uint64 - err error + tot int ) - receivedDataCh := make(chan struct{}) - timeoutCh := make(chan struct{}) - go func() { - length, err = readLEB128ToUint64(stream, buf[:1]) // TODO: return bytes read from readLEB128ToUint64 if error - close(receivedDataCh) - }() - - go func() { - time.Sleep(receiveStreamDataTimeout) - close(timeoutCh) - }() - - select { - case <-timeoutCh: - return 0, errors.New("timeout") - case <-receivedDataCh: - } - + length, err := readLEB128ToUint64(stream, buf[:1]) if err != nil { - return 0, err + return 0, err // TODO: return bytes read from readLEB128ToUint64 } if length == 0 { From d4085c13882a43a7c5d2c72db9fb8ba94a725041 Mon Sep 17 00:00:00 2001 From: noot Date: Sun, 30 May 2021 19:58:05 -0400 Subject: [PATCH 30/40] start streamManager --- dot/network/service.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dot/network/service.go b/dot/network/service.go index 96e6a72d96..32d23cc852 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -269,6 +269,7 @@ func (s *Service) Start() error { go s.logPeerCount() go s.publishNetworkTelemetry(s.closeCh) go s.sentBlockIntervalTelemetry() + s.streamManager.start() return nil } From 85e2e5d03fd80ea1595a74600528f4c94f140202 Mon Sep 17 00:00:00 2001 From: noot Date: Sun, 30 May 2021 20:05:36 -0400 Subject: [PATCH 31/40] change streamManager maps to sync.Map --- dot/network/stream_manager.go | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/dot/network/stream_manager.go b/dot/network/stream_manager.go index 6c442a23af..9f1ddbdc30 100644 --- a/dot/network/stream_manager.go +++ b/dot/network/stream_manager.go @@ -2,6 +2,7 @@ package network import ( "context" + "sync" "time" "github.com/libp2p/go-libp2p-core/network" @@ -11,15 +12,15 @@ var cleanupStreamInterval = time.Minute type streamManager struct { ctx context.Context - lastReceivedMessage map[string]time.Time - streams map[string]network.Stream + lastReceivedMessage *sync.Map //map[string]time.Time + streams *sync.Map //map[string]network.Stream } func newStreamManager(ctx context.Context) *streamManager { return &streamManager{ ctx: ctx, - lastReceivedMessage: make(map[string]time.Time), - streams: make(map[string]network.Stream), + lastReceivedMessage: new(sync.Map), + streams: new(sync.Map), } } @@ -37,26 +38,28 @@ func (sm *streamManager) start() { } func (sm *streamManager) cleanupStreams() { - for id, stream := range sm.streams { - lastReceived, has := sm.lastReceivedMessage[id] + sm.streams.Range(func(id, stream interface{}) bool { + lastReceived, has := sm.lastReceivedMessage.Load(id) if !has { - _ = stream.Close() - delete(sm.streams, id) + _ = stream.(network.Stream).Close() + sm.streams.Delete(id) } - if time.Since(lastReceived) > cleanupStreamInterval { - _ = stream.Close() - delete(sm.streams, id) - delete(sm.lastReceivedMessage, id) + if time.Since(lastReceived.(time.Time)) > cleanupStreamInterval { + _ = stream.(network.Stream).Close() + sm.streams.Delete(id) + sm.lastReceivedMessage.Delete(id) } - } + + return true + }) } func (sm *streamManager) logNewStream(stream network.Stream) { - sm.lastReceivedMessage[stream.ID()] = time.Now() // prevents closing just opened streams, in case the cleanup goroutine runs at the same time stream is opened - sm.streams[stream.ID()] = stream + sm.lastReceivedMessage.Store(stream.ID(), time.Now()) // prevents closing just opened streams, in case the cleanup goroutine runs at the same time stream is opened + sm.streams.Store(stream.ID(), stream) } func (sm *streamManager) logMessageReceived(streamID string) { - sm.lastReceivedMessage[streamID] = time.Now() + sm.lastReceivedMessage.Store(streamID, time.Now()) } From 03480e331bc1430fce5646398d2ccffcdab4b3ed Mon Sep 17 00:00:00 2001 From: noot Date: Mon, 31 May 2021 12:48:19 -0400 Subject: [PATCH 32/40] add test case --- dot/network/notifications.go | 3 ++ dot/network/stream_manager.go | 3 ++ dot/network/stream_manager_test.go | 71 ++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+) create mode 100644 dot/network/stream_manager_test.go diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 4f77099891..389e951443 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -295,6 +295,7 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer } func (s *Service) readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDecoder) (Handshake, error) { + s.streamManager.logNewStream(stream) msgBytes := s.bufPool.get() defer s.bufPool.put(&msgBytes) @@ -303,6 +304,8 @@ func (s *Service) readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDe return nil, err } + s.streamManager.logMessageReceived(stream.ID()) + hs, err := decoder(msgBytes[:tot]) if err != nil { return nil, err diff --git a/dot/network/stream_manager.go b/dot/network/stream_manager.go index 9f1ddbdc30..4cbe4d2638 100644 --- a/dot/network/stream_manager.go +++ b/dot/network/stream_manager.go @@ -10,6 +10,9 @@ import ( var cleanupStreamInterval = time.Minute +// streamManager tracks inbound streams and runs a cleanup goroutine every `cleanupStreamInterval` to close streams that +// we haven't received any data on for the last time period. this prevents keeping stale streams open and continously trying to +// read from it, which takes up lots of CPU over time. type streamManager struct { ctx context.Context lastReceivedMessage *sync.Map //map[string]time.Time diff --git a/dot/network/stream_manager_test.go b/dot/network/stream_manager_test.go new file mode 100644 index 0000000000..17f39fb5c4 --- /dev/null +++ b/dot/network/stream_manager_test.go @@ -0,0 +1,71 @@ +package network + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" + + "github.com/stretchr/testify/require" +) + +func TestStreamManager(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + cleanupStreamInterval = time.Millisecond * 500 + defer func() { + cleanupStreamInterval = time.Minute + cancel() + }() + + smA := newStreamManager(ctx) + smB := newStreamManager(ctx) + + portA := 7001 + portB := 7002 + addrA, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", portA)) + require.NoError(t, err) + addrB, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", portB)) + require.NoError(t, err) + + ha, err := libp2p.New( + ctx, libp2p.ListenAddrs(addrA), + ) + require.NoError(t, err) + + hb, err := libp2p.New( + ctx, libp2p.ListenAddrs(addrB), + ) + require.NoError(t, err) + + err = ha.Connect(ctx, peer.AddrInfo{ + ID: hb.ID(), + Addrs: hb.Addrs(), + }) + require.NoError(t, err) + + hb.SetStreamHandler("", func(stream network.Stream) { + smB.logNewStream(stream) + smB.start() + }) + + stream, err := ha.NewStream(ctx, hb.ID(), "") + require.NoError(t, err) + + smA.logNewStream(stream) + smA.start() + + time.Sleep(cleanupStreamInterval * 2) + connsAToB := ha.Network().ConnsToPeer(hb.ID()) + require.Equal(t, 1, len(connsAToB)) + require.Equal(t, 0, len(connsAToB[0].GetStreams())) + + connsBToA := hb.Network().ConnsToPeer(ha.ID()) + require.Equal(t, 1, len(connsBToA)) + require.Equal(t, 0, len(connsBToA[0].GetStreams())) +} From d26bce15aa34e5bf5faae4e5e2733cdacceba4e1 Mon Sep 17 00:00:00 2001 From: noot Date: Mon, 31 May 2021 13:38:20 -0400 Subject: [PATCH 33/40] lint --- dot/network/stream_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/network/stream_manager.go b/dot/network/stream_manager.go index 4cbe4d2638..9307ed9809 100644 --- a/dot/network/stream_manager.go +++ b/dot/network/stream_manager.go @@ -11,7 +11,7 @@ import ( var cleanupStreamInterval = time.Minute // streamManager tracks inbound streams and runs a cleanup goroutine every `cleanupStreamInterval` to close streams that -// we haven't received any data on for the last time period. this prevents keeping stale streams open and continously trying to +// we haven't received any data on for the last time period. this prevents keeping stale streams open and continuously trying to // read from it, which takes up lots of CPU over time. type streamManager struct { ctx context.Context From 945f95bd3137cde0fb68dcbcfd0c0749f837c1a5 Mon Sep 17 00:00:00 2001 From: noot Date: Mon, 31 May 2021 13:41:11 -0400 Subject: [PATCH 34/40] cleanup --- dot/network/notifications.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 389e951443..4f77099891 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -295,7 +295,6 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer } func (s *Service) readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDecoder) (Handshake, error) { - s.streamManager.logNewStream(stream) msgBytes := s.bufPool.get() defer s.bufPool.put(&msgBytes) @@ -304,8 +303,6 @@ func (s *Service) readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDe return nil, err } - s.streamManager.logMessageReceived(stream.ID()) - hs, err := decoder(msgBytes[:tot]) if err != nil { return nil, err From ea5248cad284a14b064e4eed0e2ad656abcb6626 Mon Sep 17 00:00:00 2001 From: noot Date: Mon, 31 May 2021 15:20:41 -0400 Subject: [PATCH 35/40] test cleanup --- dot/network/stream_manager_test.go | 39 +++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/dot/network/stream_manager_test.go b/dot/network/stream_manager_test.go index 17f39fb5c4..c831f01144 100644 --- a/dot/network/stream_manager_test.go +++ b/dot/network/stream_manager_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/libp2p/go-libp2p" + libp2phost "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" @@ -14,14 +15,14 @@ import ( "github.com/stretchr/testify/require" ) -func TestStreamManager(t *testing.T) { +func setupStreamManagerTest(t *testing.T) (context.Context, []libp2phost.Host, []*streamManager) { ctx, cancel := context.WithCancel(context.Background()) cleanupStreamInterval = time.Millisecond * 500 - defer func() { + t.Cleanup(func() { cleanupStreamInterval = time.Minute cancel() - }() + }) smA := newStreamManager(ctx) smB := newStreamManager(ctx) @@ -51,14 +52,22 @@ func TestStreamManager(t *testing.T) { hb.SetStreamHandler("", func(stream network.Stream) { smB.logNewStream(stream) - smB.start() }) + return ctx, []libp2phost.Host{ha, hb}, []*streamManager{smA, smB} +} + +func TestStreamManager(t *testing.T) { + ctx, hosts, sms := setupStreamManagerTest(t) + ha, hb := hosts[0], hosts[1] + smA, smB := sms[0], sms[1] + stream, err := ha.NewStream(ctx, hb.ID(), "") require.NoError(t, err) smA.logNewStream(stream) smA.start() + smB.start() time.Sleep(cleanupStreamInterval * 2) connsAToB := ha.Network().ConnsToPeer(hb.ID()) @@ -69,3 +78,25 @@ func TestStreamManager(t *testing.T) { require.Equal(t, 1, len(connsBToA)) require.Equal(t, 0, len(connsBToA[0].GetStreams())) } + +func TestStreamManager_KeepStream(t *testing.T) { + ctx, hosts, sms := setupStreamManagerTest(t) + ha, hb := hosts[0], hosts[1] + smA, smB := sms[0], sms[1] + + stream, err := ha.NewStream(ctx, hb.ID(), "") + require.NoError(t, err) + + smA.logNewStream(stream) + smA.start() + smB.start() + + time.Sleep(cleanupStreamInterval / 2) + connsAToB := ha.Network().ConnsToPeer(hb.ID()) + require.Equal(t, 1, len(connsAToB)) + require.Equal(t, 1, len(connsAToB[0].GetStreams())) + + connsBToA := hb.Network().ConnsToPeer(ha.ID()) + require.Equal(t, 1, len(connsBToA)) + require.Equal(t, 1, len(connsBToA[0].GetStreams())) +} From bb94aec7989a590c6c9abc96cdacb184b2db31b5 Mon Sep 17 00:00:00 2001 From: noot Date: Tue, 1 Jun 2021 11:49:13 -0400 Subject: [PATCH 36/40] use single map in streamManager --- dot/network/stream_manager.go | 48 +++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/dot/network/stream_manager.go b/dot/network/stream_manager.go index 9307ed9809..0797404a68 100644 --- a/dot/network/stream_manager.go +++ b/dot/network/stream_manager.go @@ -10,20 +10,23 @@ import ( var cleanupStreamInterval = time.Minute +type streamData struct { + lastReceivedMessage time.Time + stream network.Stream +} + // streamManager tracks inbound streams and runs a cleanup goroutine every `cleanupStreamInterval` to close streams that // we haven't received any data on for the last time period. this prevents keeping stale streams open and continuously trying to // read from it, which takes up lots of CPU over time. type streamManager struct { - ctx context.Context - lastReceivedMessage *sync.Map //map[string]time.Time - streams *sync.Map //map[string]network.Stream + ctx context.Context + streamData *sync.Map //map[string]streamData } func newStreamManager(ctx context.Context) *streamManager { return &streamManager{ - ctx: ctx, - lastReceivedMessage: new(sync.Map), - streams: new(sync.Map), + ctx: ctx, + streamData: new(sync.Map), } } @@ -41,17 +44,14 @@ func (sm *streamManager) start() { } func (sm *streamManager) cleanupStreams() { - sm.streams.Range(func(id, stream interface{}) bool { - lastReceived, has := sm.lastReceivedMessage.Load(id) - if !has { - _ = stream.(network.Stream).Close() - sm.streams.Delete(id) - } + sm.streamData.Range(func(id, data interface{}) bool { + streamData := data.(streamData) + lastReceived := streamData.lastReceivedMessage + stream := streamData.stream - if time.Since(lastReceived.(time.Time)) > cleanupStreamInterval { - _ = stream.(network.Stream).Close() - sm.streams.Delete(id) - sm.lastReceivedMessage.Delete(id) + if time.Since(lastReceived) > cleanupStreamInterval { + _ = stream.Close() + sm.streamData.Delete(id) } return true @@ -59,10 +59,20 @@ func (sm *streamManager) cleanupStreams() { } func (sm *streamManager) logNewStream(stream network.Stream) { - sm.lastReceivedMessage.Store(stream.ID(), time.Now()) // prevents closing just opened streams, in case the cleanup goroutine runs at the same time stream is opened - sm.streams.Store(stream.ID(), stream) + data := streamData{ + lastReceivedMessage: time.Now(), // prevents closing just opened streams, in case the cleanup goroutine runs at the same time stream is opened + stream: stream, + } + sm.streamData.Store(stream.ID(), data) } func (sm *streamManager) logMessageReceived(streamID string) { - sm.lastReceivedMessage.Store(streamID, time.Now()) + data, has := sm.streamData.Load(streamID) + if !has { + return + } + + streamData := data.(streamData) + streamData.lastReceivedMessage = time.Now() + sm.streamData.Store(streamID, streamData) } From 09d9aab50d384db68dac269b4eb5172654b50d16 Mon Sep 17 00:00:00 2001 From: noot Date: Tue, 1 Jun 2021 11:50:04 -0400 Subject: [PATCH 37/40] lint --- dot/network/stream_manager.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dot/network/stream_manager.go b/dot/network/stream_manager.go index 0797404a68..415baf77ac 100644 --- a/dot/network/stream_manager.go +++ b/dot/network/stream_manager.go @@ -45,9 +45,9 @@ func (sm *streamManager) start() { func (sm *streamManager) cleanupStreams() { sm.streamData.Range(func(id, data interface{}) bool { - streamData := data.(streamData) - lastReceived := streamData.lastReceivedMessage - stream := streamData.stream + sdata := data.(streamData) + lastReceived := sdata.lastReceivedMessage + stream := sdata.stream if time.Since(lastReceived) > cleanupStreamInterval { _ = stream.Close() @@ -72,7 +72,7 @@ func (sm *streamManager) logMessageReceived(streamID string) { return } - streamData := data.(streamData) - streamData.lastReceivedMessage = time.Now() - sm.streamData.Store(streamID, streamData) + sdata := data.(streamData) + sdata.lastReceivedMessage = time.Now() + sm.streamData.Store(streamID, sdata) } From bbdee5773f09db593a4d01fd06830d1579cf3311 Mon Sep 17 00:00:00 2001 From: noot Date: Tue, 1 Jun 2021 14:46:40 -0400 Subject: [PATCH 38/40] address comments --- dot/network/stream_manager.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/dot/network/stream_manager.go b/dot/network/stream_manager.go index 415baf77ac..ef24c13258 100644 --- a/dot/network/stream_manager.go +++ b/dot/network/stream_manager.go @@ -19,14 +19,14 @@ type streamData struct { // we haven't received any data on for the last time period. this prevents keeping stale streams open and continuously trying to // read from it, which takes up lots of CPU over time. type streamManager struct { - ctx context.Context - streamData *sync.Map //map[string]streamData + ctx context.Context + streamDataMap *sync.Map //map[string]*streamData } func newStreamManager(ctx context.Context) *streamManager { return &streamManager{ - ctx: ctx, - streamData: new(sync.Map), + ctx: ctx, + streamDataMap: new(sync.Map), } } @@ -44,14 +44,14 @@ func (sm *streamManager) start() { } func (sm *streamManager) cleanupStreams() { - sm.streamData.Range(func(id, data interface{}) bool { - sdata := data.(streamData) + sm.streamDataMap.Range(func(id, data interface{}) bool { + sdata := data.(*streamData) lastReceived := sdata.lastReceivedMessage stream := sdata.stream if time.Since(lastReceived) > cleanupStreamInterval { _ = stream.Close() - sm.streamData.Delete(id) + sm.streamDataMap.Delete(id) } return true @@ -59,20 +59,20 @@ func (sm *streamManager) cleanupStreams() { } func (sm *streamManager) logNewStream(stream network.Stream) { - data := streamData{ + data := &streamData{ lastReceivedMessage: time.Now(), // prevents closing just opened streams, in case the cleanup goroutine runs at the same time stream is opened stream: stream, } - sm.streamData.Store(stream.ID(), data) + sm.streamDataMap.Store(stream.ID(), data) } func (sm *streamManager) logMessageReceived(streamID string) { - data, has := sm.streamData.Load(streamID) + data, has := sm.streamDataMap.Load(streamID) if !has { return } - sdata := data.(streamData) + sdata := data.(*streamData) sdata.lastReceivedMessage = time.Now() - sm.streamData.Store(streamID, sdata) + sm.streamDataMap.Store(streamID, sdata) } From f061756c180be8ffc84d47064584ff21d0cca34e Mon Sep 17 00:00:00 2001 From: noot Date: Tue, 1 Jun 2021 17:18:33 -0400 Subject: [PATCH 39/40] use ticker.C --- dot/network/stream_manager.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dot/network/stream_manager.go b/dot/network/stream_manager.go index ef24c13258..de7f8a53c2 100644 --- a/dot/network/stream_manager.go +++ b/dot/network/stream_manager.go @@ -32,11 +32,12 @@ func newStreamManager(ctx context.Context) *streamManager { func (sm *streamManager) start() { go func() { + ticker := time.NewTicker(cleanupStreamInterval) for { select { case <-sm.ctx.Done(): return - case <-time.After(cleanupStreamInterval): + case <-ticker.C: sm.cleanupStreams() } } From c3b1a8389d5785635f4ddfd27dede7645fca990a Mon Sep 17 00:00:00 2001 From: noot Date: Tue, 1 Jun 2021 19:41:13 -0400 Subject: [PATCH 40/40] add ticker.Stop --- dot/network/stream_manager.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dot/network/stream_manager.go b/dot/network/stream_manager.go index de7f8a53c2..6755f5c3da 100644 --- a/dot/network/stream_manager.go +++ b/dot/network/stream_manager.go @@ -32,7 +32,9 @@ func newStreamManager(ctx context.Context) *streamManager { func (sm *streamManager) start() { go func() { - ticker := time.NewTicker(cleanupStreamInterval) + ticker := time.NewTicker(cleanupStreamInterval) + defer ticker.Stop() + for { select { case <-sm.ctx.Done():