From f18d89eb6650ed8b047fcfcb698c76e98210a442 Mon Sep 17 00:00:00 2001 From: noot <36753753+noot@users.noreply.github.com> Date: Fri, 21 May 2021 17:47:30 -0400 Subject: [PATCH] chore(dot/network): use sync.Pool for network message buffers (#1600) --- dot/network/config.go | 2 ++ dot/network/notifications.go | 12 ++++---- dot/network/pool.go | 57 ++++++++++++++++++++++++++++++++++++ dot/network/service.go | 26 ++++++++++++---- dot/network/service_test.go | 2 ++ dot/network/sync.go | 2 +- 6 files changed, 89 insertions(+), 12 deletions(-) create mode 100644 dot/network/pool.go diff --git a/dot/network/config.go b/dot/network/config.go index af178811c7..aa6accfb39 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -97,6 +97,8 @@ type Config struct { // telemetryInterval how often to send telemetry metrics telemetryInterval time.Duration + + noPreAllocate bool // internal option } // build checks the configuration, sets up the private key for the network service, diff --git a/dot/network/notifications.go b/dot/network/notifications.go index a3585d4e66..4f77099891 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -28,7 +28,7 @@ import ( var errCannotValidateHandshake = errors.New("failed to validate handshake") -var maxHandshakeSize = unsafe.Sizeof(BlockAnnounceHandshake{}) //nolint +const maxHandshakeSize = unsafe.Sizeof(BlockAnnounceHandshake{}) //nolint // Handshake is the interface all handshakes for notifications protocols must implement type Handshake interface { @@ -226,7 +226,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc return } - hs, err := readHandshake(stream, decodeBlockAnnounceHandshake) + hs, err := s.readHandshake(stream, decodeBlockAnnounceHandshake) if err != nil { logger.Trace("failed to read handshake", "protocol", info.protocolID, "peer", peer, "error", err) _ = stream.Close() @@ -294,9 +294,11 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer } } -func readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDecoder) (Handshake, error) { - msgBytes := make([]byte, maxHandshakeSize) - tot, err := readStream(stream, msgBytes) +func (s *Service) readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDecoder) (Handshake, error) { + msgBytes := s.bufPool.get() + defer s.bufPool.put(&msgBytes) + + tot, err := readStream(stream, msgBytes[:]) if err != nil { return nil, err } diff --git a/dot/network/pool.go b/dot/network/pool.go new file mode 100644 index 0000000000..9921a8ae26 --- /dev/null +++ b/dot/network/pool.go @@ -0,0 +1,57 @@ +// Copyright 2019 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + +package network + +// sizedBufferPool is a pool of buffers used for reading from streams +type sizedBufferPool struct { + c chan *[maxMessageSize]byte +} + +func newSizedBufferPool(min, max int) (bp *sizedBufferPool) { + bufferCh := make(chan *[maxMessageSize]byte, max) + + for i := 0; i < min; i++ { + buf := [maxMessageSize]byte{} + bufferCh <- &buf + } + + return &sizedBufferPool{ + c: bufferCh, + } +} + +// get gets a buffer from the sizedBufferPool, or creates a new one if none are +// available in the pool. Buffers have a pre-allocated capacity. +func (bp *sizedBufferPool) get() [maxMessageSize]byte { + var buff *[maxMessageSize]byte + select { + case buff = <-bp.c: + // reuse existing buffer + default: + // create new buffer + buff = &[maxMessageSize]byte{} + } + return *buff +} + +// put returns the given buffer to the sizedBufferPool. +func (bp *sizedBufferPool) put(b *[maxMessageSize]byte) { + select { + case bp.c <- b: + default: // Discard the buffer if the pool is full. + } +} diff --git a/dot/network/service.go b/dot/network/service.go index c529f22112..95c35ebca4 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -44,6 +44,8 @@ const ( lightID = "/light/2" blockAnnounceID = "/block-announces/1" transactionsID = "/transactions/1" + + maxMessageSize = 1024 * 1024 // 1mb for now ) var ( @@ -70,6 +72,7 @@ type Service struct { mdns *mdns gossip *gossip syncQueue *syncQueue + bufPool *sizedBufferPool notificationsProtocols map[byte]*notificationsProtocol // map of sub-protocol msg ID to protocol info notificationsMu sync.RWMutex @@ -130,6 +133,18 @@ func NewService(cfg *Config) (*Service, error) { return nil, err } + // pre-allocate pool of buffers used to read from streams. + // initially allocate as many buffers as liekly necessary which is the number inbound streams we will have, + // which should equal average number of peers times the number of notifications protocols, which is currently 3. + var bufPool *sizedBufferPool + if cfg.noPreAllocate { + bufPool = &sizedBufferPool{ + c: make(chan *[maxMessageSize]byte, cfg.MaxPeers*3), + } + } else { + bufPool = newSizedBufferPool((cfg.MaxPeers-cfg.MinPeers)*3/2, (cfg.MaxPeers+1)*3) + } + network := &Service{ ctx: ctx, cancel: cancel, @@ -146,6 +161,7 @@ func NewService(cfg *Config) (*Service, error) { lightRequest: make(map[peer.ID]struct{}), telemetryInterval: cfg.telemetryInterval, closeCh: make(chan interface{}), + bufPool: bufPool, } network.syncQueue = newSyncQueue(network) @@ -509,14 +525,12 @@ func isInbound(stream libp2pnetwork.Stream) bool { } func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder, handler messageHandler) { - var ( - maxMessageSize uint64 = maxBlockResponseSize // TODO: determine actual max message size - msgBytes = make([]byte, maxMessageSize) - peer = stream.Conn().RemotePeer() - ) + peer := stream.Conn().RemotePeer() + msgBytes := s.bufPool.get() + defer s.bufPool.put(&msgBytes) for { - tot, err := readStream(stream, msgBytes) + tot, err := readStream(stream, msgBytes[:]) if err == io.EOF { continue } else if err != nil { diff --git a/dot/network/service_test.go b/dot/network/service_test.go index 24be8fd46f..fcd64bf0b8 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -100,6 +100,8 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) { cfg.Syncer = newMockSyncer() } + cfg.noPreAllocate = true + srvc, err := NewService(cfg) require.NoError(t, err) diff --git a/dot/network/sync.go b/dot/network/sync.go index 546ee009ab..22e1993ab9 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -79,7 +79,7 @@ func (s *Service) handleSyncMessage(stream libp2pnetwork.Stream, msg Message) er return nil } -var ( +const ( blockRequestSize uint32 = 128 blockRequestBufferSize int = 6 blockResponseBufferSize int = 6