From 9ce6d44f53e7ba169cb1e0f779f419d3e585622a Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Wed, 21 Jul 2021 16:14:42 -0400 Subject: [PATCH] chore(lib/common) implement byte pool to improve websocket subscription efficiency (#1693) * implement bufferpool * implement byte pool for subscription setup * lint * fix lint * address comments * refactor UnregisterFinalizedChannel to UnregisterFinalisedChannel * lint * change error handling * refactor NumPooled to Len --- dot/core/interface.go | 2 +- dot/digest/digest.go | 2 +- dot/digest/interface.go | 2 +- dot/rpc/modules/api.go | 2 +- dot/rpc/modules/mocks/block_api.go | 4 +- dot/state/block.go | 18 ++++++-- dot/state/block_notify.go | 47 ++++++++------------- dot/state/block_notify_test.go | 4 +- lib/common/bytepool.go | 66 ++++++++++++++++++++++++++++++ lib/common/bytepool_test.go | 62 ++++++++++++++++++++++++++++ lib/grandpa/grandpa.go | 2 +- lib/grandpa/state.go | 2 +- 12 files changed, 168 insertions(+), 45 deletions(-) create mode 100644 lib/common/bytepool.go create mode 100644 lib/common/bytepool_test.go diff --git a/dot/core/interface.go b/dot/core/interface.go index 867166fb9c..f76d91b486 100644 --- a/dot/core/interface.go +++ b/dot/core/interface.go @@ -44,7 +44,7 @@ type BlockState interface { RegisterImportedChannel(ch chan<- *types.Block) (byte, error) UnregisterImportedChannel(id byte) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) - UnregisterFinalizedChannel(id byte) + UnregisterFinalisedChannel(id byte) HighestCommonAncestor(a, b common.Hash) (common.Hash, error) SubChain(start, end common.Hash) ([]common.Hash, error) GetBlockBody(hash common.Hash) (*types.Body, error) diff --git a/dot/digest/digest.go b/dot/digest/digest.go index a511e0160e..ebd5dec899 100644 --- a/dot/digest/digest.go +++ b/dot/digest/digest.go @@ -112,7 +112,7 @@ func (h *Handler) Start() error { func (h *Handler) Stop() error { h.cancel() h.blockState.UnregisterImportedChannel(h.importedID) - h.blockState.UnregisterFinalizedChannel(h.finalisedID) + h.blockState.UnregisterFinalisedChannel(h.finalisedID) close(h.imported) close(h.finalised) return nil diff --git a/dot/digest/interface.go b/dot/digest/interface.go index f3df3c7f60..bd42d9b883 100644 --- a/dot/digest/interface.go +++ b/dot/digest/interface.go @@ -29,7 +29,7 @@ type BlockState interface { RegisterImportedChannel(ch chan<- *types.Block) (byte, error) UnregisterImportedChannel(id byte) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) - UnregisterFinalizedChannel(id byte) + UnregisterFinalisedChannel(id byte) } // EpochState is the interface for state.EpochState diff --git a/dot/rpc/modules/api.go b/dot/rpc/modules/api.go index 636707eca1..d0ad8eb896 100644 --- a/dot/rpc/modules/api.go +++ b/dot/rpc/modules/api.go @@ -36,7 +36,7 @@ type BlockAPI interface { RegisterImportedChannel(ch chan<- *types.Block) (byte, error) UnregisterImportedChannel(id byte) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) - UnregisterFinalizedChannel(id byte) + UnregisterFinalisedChannel(id byte) SubChain(start, end common.Hash) ([]common.Hash, error) } diff --git a/dot/rpc/modules/mocks/block_api.go b/dot/rpc/modules/mocks/block_api.go index ef2fe86d4c..1bb565bfa0 100644 --- a/dot/rpc/modules/mocks/block_api.go +++ b/dot/rpc/modules/mocks/block_api.go @@ -233,8 +233,8 @@ func (_m *MockBlockAPI) SubChain(start common.Hash, end common.Hash) ([]common.H return r0, r1 } -// UnregisterFinalizedChannel provides a mock function with given fields: id -func (_m *MockBlockAPI) UnregisterFinalizedChannel(id byte) { +// UnregisterFinalisedChannel provides a mock function with given fields: id +func (_m *MockBlockAPI) UnregisterFinalisedChannel(id byte) { _m.Called(id) } diff --git a/dot/state/block.go b/dot/state/block.go index 32919ff380..77c2595aa6 100644 --- a/dot/state/block.go +++ b/dot/state/block.go @@ -46,10 +46,12 @@ type BlockState struct { lastFinalised common.Hash // block notifiers - imported map[byte]chan<- *types.Block - finalised map[byte]chan<- *types.FinalisationInfo - importedLock sync.RWMutex - finalisedLock sync.RWMutex + imported map[byte]chan<- *types.Block + finalised map[byte]chan<- *types.FinalisationInfo + importedLock sync.RWMutex + finalisedLock sync.RWMutex + importedBytePool *common.BytePool + finalisedBytePool *common.BytePool pruneKeyCh chan *types.Header } @@ -80,6 +82,10 @@ func NewBlockState(db chaindb.Database, bt *blocktree.BlockTree) (*BlockState, e return nil, fmt.Errorf("failed to get last finalised hash: %w", err) } + bs.importedBytePool = common.NewBytePool256() + + bs.finalisedBytePool = common.NewBytePool256() + return bs, nil } @@ -117,6 +123,10 @@ func NewBlockStateFromGenesis(db chaindb.Database, header *types.Header) (*Block return nil, err } + bs.importedBytePool = common.NewBytePool256() + + bs.finalisedBytePool = common.NewBytePool256() + return bs, nil } diff --git a/dot/state/block_notify.go b/dot/state/block_notify.go index 1fbcb214f6..8c8c7e7c77 100644 --- a/dot/state/block_notify.go +++ b/dot/state/block_notify.go @@ -17,9 +17,6 @@ package state import ( - "errors" - "math/rand" - "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" ) @@ -29,16 +26,9 @@ import ( func (bs *BlockState) RegisterImportedChannel(ch chan<- *types.Block) (byte, error) { bs.importedLock.RLock() - if len(bs.imported) == 256 { - return 0, errors.New("channel limit reached") - } - - var id byte - for { - id = generateID() - if bs.imported[id] == nil { - break - } + id, err := bs.importedBytePool.Get() + if err != nil { + return 0, err } bs.importedLock.RUnlock() @@ -54,16 +44,9 @@ func (bs *BlockState) RegisterImportedChannel(ch chan<- *types.Block) (byte, err func (bs *BlockState) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) { bs.finalisedLock.RLock() - if len(bs.finalised) == 256 { - return 0, errors.New("channel limit reached") - } - - var id byte - for { - id = generateID() - if bs.finalised[id] == nil { - break - } + id, err := bs.finalisedBytePool.Get() + if err != nil { + return 0, err } bs.finalisedLock.RUnlock() @@ -81,15 +64,23 @@ func (bs *BlockState) UnregisterImportedChannel(id byte) { defer bs.importedLock.Unlock() delete(bs.imported, id) + err := bs.importedBytePool.Put(id) + if err != nil { + logger.Error("failed to unregister imported channel", "error", err) + } } -// UnregisterFinalizedChannel removes the block finalisation notification channel with the given ID. +// UnregisterFinalisedChannel removes the block finalisation notification channel with the given ID. // A channel must be unregistered before closing it. -func (bs *BlockState) UnregisterFinalizedChannel(id byte) { +func (bs *BlockState) UnregisterFinalisedChannel(id byte) { bs.finalisedLock.Lock() defer bs.finalisedLock.Unlock() delete(bs.finalised, id) + err := bs.finalisedBytePool.Put(id) + if err != nil { + logger.Error("failed to unregister finalised channel", "error", err) + } } func (bs *BlockState) notifyImported(block *types.Block) { @@ -141,9 +132,3 @@ func (bs *BlockState) notifyFinalized(hash common.Hash, round, setID uint64) { }(ch) } } - -func generateID() byte { - // skipcq: GSC-G404 - id := rand.Intn(256) //nolint - return byte(id) -} diff --git a/dot/state/block_notify_test.go b/dot/state/block_notify_test.go index 4d3c70d263..60268d1adb 100644 --- a/dot/state/block_notify_test.go +++ b/dot/state/block_notify_test.go @@ -56,7 +56,7 @@ func TestFinalizedChannel(t *testing.T) { id, err := bs.RegisterFinalizedChannel(ch) require.NoError(t, err) - defer bs.UnregisterFinalizedChannel(id) + defer bs.UnregisterFinalisedChannel(id) chain, _ := AddBlocksToState(t, bs, 3) @@ -150,6 +150,6 @@ func TestFinalizedChannel_Multi(t *testing.T) { wg.Wait() for _, id := range ids { - bs.UnregisterFinalizedChannel(id) + bs.UnregisterFinalisedChannel(id) } } diff --git a/lib/common/bytepool.go b/lib/common/bytepool.go new file mode 100644 index 0000000000..f708a04f47 --- /dev/null +++ b/lib/common/bytepool.go @@ -0,0 +1,66 @@ +// Copyright 2019 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + +package common + +import "fmt" + +// BytePool struct to hold byte objects that will be contained in pool +type BytePool struct { + c chan byte +} + +// NewBytePool256 creates and initialises pool with 256 entries +func NewBytePool256() *BytePool { + bp := NewBytePool(256) + for i := 0; i < 256; i++ { + _ = bp.Put(byte(i)) + } + return bp +} + +// NewBytePool creates a new empty byte pool with capacity of size +func NewBytePool(size int) (bp *BytePool) { + return &BytePool{ + c: make(chan byte, size), + } +} + +// Get gets a Buffer from the BytePool, or creates a new one if none are +// available in the pool. +func (bp *BytePool) Get() (b byte, err error) { + select { + case b = <-bp.c: + default: + err = fmt.Errorf("all slots used") + } + return +} + +// Put returns the given Buffer to the BytePool. +func (bp *BytePool) Put(b byte) error { + select { + case bp.c <- b: + return nil + default: + return fmt.Errorf("pool is full") + } +} + +// Len returns the number of items currently pooled. +func (bp *BytePool) Len() int { + return len(bp.c) +} diff --git a/lib/common/bytepool_test.go b/lib/common/bytepool_test.go new file mode 100644 index 0000000000..997de16516 --- /dev/null +++ b/lib/common/bytepool_test.go @@ -0,0 +1,62 @@ +// Copyright 2019 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + +package common + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBytePool(t *testing.T) { + bp := NewBytePool(5) + require.Equal(t, 0, bp.Len()) + + for i := 0; i < 5; i++ { + err := bp.Put(generateID()) + require.NoError(t, err) + } + err := bp.Put(generateID()) + require.EqualError(t, err, "pool is full") + require.Equal(t, 5, bp.Len()) + + for i := 0; i < 5; i++ { + _, err := bp.Get() // nolint + require.NoError(t, err) + } + _, err = bp.Get() + require.EqualError(t, err, "all slots used") +} + +func TestBytePool256(t *testing.T) { + bp := NewBytePool256() + require.Equal(t, 256, bp.Len()) + + for i := 0; i < 256; i++ { + _, err := bp.Get() // nolint + require.NoError(t, err) + } + _, err := bp.Get() + require.EqualError(t, err, "all slots used") +} + +func generateID() byte { + // skipcq: GSC-G404 + id := rand.Intn(256) //nolint + return byte(id) +} diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 6e954b4741..c826d5ab00 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -211,7 +211,7 @@ func (s *Service) Stop() error { s.cancel() - s.blockState.UnregisterFinalizedChannel(s.finalisedChID) + s.blockState.UnregisterFinalisedChannel(s.finalisedChID) close(s.finalisedCh) if !s.authority { diff --git a/lib/grandpa/state.go b/lib/grandpa/state.go index 283af0380b..52801cc814 100644 --- a/lib/grandpa/state.go +++ b/lib/grandpa/state.go @@ -45,7 +45,7 @@ type BlockState interface { RegisterImportedChannel(ch chan<- *types.Block) (byte, error) UnregisterImportedChannel(id byte) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) - UnregisterFinalizedChannel(id byte) + UnregisterFinalisedChannel(id byte) SetJustification(hash common.Hash, data []byte) error HasJustification(hash common.Hash) (bool, error) GetJustification(hash common.Hash) ([]byte, error)