From 55e6cdbddc96632ee790cf485a10db1bac59b73f Mon Sep 17 00:00:00 2001 From: y0sher Date: Tue, 4 Feb 2025 13:56:58 +0200 Subject: [PATCH 1/5] fix memory leak by removing older slots after cache reached 34 slots. --- message/validation/const.go | 2 +- .../ssv/validator/non_committee_validator.go | 79 +++++++++++++------ 2 files changed, 55 insertions(+), 26 deletions(-) diff --git a/message/validation/const.go b/message/validation/const.go index 339556fc4e..177436b0a3 100644 --- a/message/validation/const.go +++ b/message/validation/const.go @@ -13,7 +13,7 @@ const ( clockErrorTolerance = time.Millisecond * 50 allowedRoundsInFuture = 1 allowedRoundsInPast = 2 - lateSlotAllowance = 2 + LateSlotAllowance = 2 syncCommitteeSize = 512 rsaSignatureSize = 256 operatorIDSize = 8 // uint64 diff --git a/protocol/v2/ssv/validator/non_committee_validator.go b/protocol/v2/ssv/validator/non_committee_validator.go index 611320effe..fa32f699d8 100644 --- a/protocol/v2/ssv/validator/non_committee_validator.go +++ b/protocol/v2/ssv/validator/non_committee_validator.go @@ -3,6 +3,7 @@ package validator import ( "encoding/hex" "fmt" + "github.com/ssvlabs/ssv/message/validation" "slices" "strconv" "strings" @@ -28,17 +29,18 @@ import ( ) type CommitteeObserver struct { - msgID spectypes.MessageID - logger *zap.Logger - Storage *storage.ParticipantStores - beaconNetwork beacon.BeaconNetwork - networkConfig networkconfig.NetworkConfig - ValidatorStore registrystorage.ValidatorStore - newDecidedHandler qbftcontroller.NewDecidedHandler - attesterRoots *ttlcache.Cache[phase0.Root, struct{}] - syncCommRoots *ttlcache.Cache[phase0.Root, struct{}] - domainCache *DomainCache - postConsensusContainer map[phase0.ValidatorIndex]*ssv.PartialSigContainer + msgID spectypes.MessageID + logger *zap.Logger + Storage *storage.ParticipantStores + beaconNetwork beacon.BeaconNetwork + networkConfig networkconfig.NetworkConfig + ValidatorStore registrystorage.ValidatorStore + newDecidedHandler qbftcontroller.NewDecidedHandler + attesterRoots *ttlcache.Cache[phase0.Root, struct{}] + syncCommRoots *ttlcache.Cache[phase0.Root, struct{}] + domainCache *DomainCache + // TODO: consider using round-robin container as []map[phase0.ValidatorIndex]*ssv.PartialSigContainer similar to what is used in OperatorState + postConsensusContainer map[phase0.Slot]map[phase0.ValidatorIndex]*ssv.PartialSigContainer } type CommitteeObserverOptions struct { @@ -59,19 +61,22 @@ type CommitteeObserverOptions struct { func NewCommitteeObserver(msgID spectypes.MessageID, opts CommitteeObserverOptions) *CommitteeObserver { // TODO: does the specific operator matters? - return &CommitteeObserver{ - msgID: msgID, - logger: opts.Logger, - Storage: opts.Storage, - beaconNetwork: opts.NetworkConfig.Beacon, - networkConfig: opts.NetworkConfig, - ValidatorStore: opts.ValidatorStore, - newDecidedHandler: opts.NewDecidedHandler, - attesterRoots: opts.AttesterRoots, - syncCommRoots: opts.SyncCommRoots, - domainCache: opts.DomainCache, - postConsensusContainer: make(map[phase0.ValidatorIndex]*ssv.PartialSigContainer), + co := &CommitteeObserver{ + msgID: msgID, + logger: opts.Logger, + Storage: opts.Storage, + beaconNetwork: opts.NetworkConfig.Beacon, + networkConfig: opts.NetworkConfig, + ValidatorStore: opts.ValidatorStore, + newDecidedHandler: opts.NewDecidedHandler, + attesterRoots: opts.AttesterRoots, + syncCommRoots: opts.SyncCommRoots, + domainCache: opts.DomainCache, } + + co.postConsensusContainer = make(map[phase0.Slot]map[phase0.ValidatorIndex]*ssv.PartialSigContainer, co.postConsensusContainerCapacity()) + + return co } func (ncv *CommitteeObserver) ProcessMessage(msg *queue.SSVMessage) error { @@ -214,15 +219,22 @@ func (ncv *CommitteeObserver) processMessage( ) (map[validatorIndexAndRoot][]spectypes.OperatorID, error) { quorums := make(map[validatorIndexAndRoot][]spectypes.OperatorID) + currentSlot := signedMsg.Slot + slotValidators, exist := ncv.postConsensusContainer[currentSlot] + if !exist { + slotValidators = make(map[phase0.ValidatorIndex]*ssv.PartialSigContainer) + ncv.postConsensusContainer[signedMsg.Slot] = slotValidators + } + for _, msg := range signedMsg.Messages { validator, exists := ncv.ValidatorStore.ValidatorByIndex(msg.ValidatorIndex) if !exists { return nil, fmt.Errorf("could not find share for validator with index %d", msg.ValidatorIndex) } - container, ok := ncv.postConsensusContainer[msg.ValidatorIndex] + container, ok := slotValidators[msg.ValidatorIndex] if !ok { container = ssv.NewPartialSigContainer(validator.Quorum()) - ncv.postConsensusContainer[msg.ValidatorIndex] = container + slotValidators[msg.ValidatorIndex] = container } if container.HasSignature(msg.ValidatorIndex, msg.Signer, msg.SigningRoot) { ncv.resolveDuplicateSignature(container, msg, validator) @@ -244,6 +256,18 @@ func (ncv *CommitteeObserver) processMessage( } } } + + // Remove older slots container + if len(ncv.postConsensusContainer) >= ncv.postConsensusContainerCapacity() { + // #nosec G115 -- capacity must be low epoch not to cause overflow + thresholdSlot := currentSlot - phase0.Slot(ncv.postConsensusContainerCapacity()) + for slot := range ncv.postConsensusContainer { + if slot < thresholdSlot { + delete(ncv.postConsensusContainer, slot) + } + } + } + return quorums, nil } @@ -354,6 +378,11 @@ func (ncv *CommitteeObserver) saveSyncCommRoots(epoch phase0.Epoch, beaconVote * return nil } +func (ncv *CommitteeObserver) postConsensusContainerCapacity() int { + // #nosec G115 -- slots per epoch must be low epoch not to cause overflow + return int(ncv.networkConfig.SlotsPerEpoch()) + validation.LateSlotAllowance +} + func constructAttestationData(vote *spectypes.BeaconVote, slot phase0.Slot, committeeIndex phase0.CommitteeIndex) *phase0.AttestationData { return &phase0.AttestationData{ Slot: slot, From 0079c325c4e468f464e010d0ba7930d61317f421 Mon Sep 17 00:00:00 2001 From: y0sher Date: Tue, 4 Feb 2025 14:00:47 +0200 Subject: [PATCH 2/5] rename variable in validation pkg --- message/validation/common_checks.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/message/validation/common_checks.go b/message/validation/common_checks.go index 35c0ae59fe..11e36ddb9f 100644 --- a/message/validation/common_checks.go +++ b/message/validation/common_checks.go @@ -38,9 +38,9 @@ func (mv *messageValidator) messageLateness(slot phase0.Slot, role spectypes.Run var ttl phase0.Slot switch role { case spectypes.RoleProposer, spectypes.RoleSyncCommitteeContribution: - ttl = 1 + lateSlotAllowance + ttl = 1 + LateSlotAllowance case spectypes.RoleCommittee, spectypes.RoleAggregator: - ttl = phase0.Slot(mv.netCfg.Beacon.SlotsPerEpoch()) + lateSlotAllowance + ttl = phase0.Slot(mv.netCfg.Beacon.SlotsPerEpoch()) + LateSlotAllowance case spectypes.RoleValidatorRegistration, spectypes.RoleVoluntaryExit: return 0 } From 55ca29eb12ebad953d72c7bf1a6c6c9b4501c900 Mon Sep 17 00:00:00 2001 From: y0sher Date: Tue, 4 Feb 2025 15:23:15 +0200 Subject: [PATCH 3/5] goimports --- protocol/v2/ssv/validator/non_committee_validator.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/protocol/v2/ssv/validator/non_committee_validator.go b/protocol/v2/ssv/validator/non_committee_validator.go index fa32f699d8..fb9ebcc32d 100644 --- a/protocol/v2/ssv/validator/non_committee_validator.go +++ b/protocol/v2/ssv/validator/non_committee_validator.go @@ -3,11 +3,12 @@ package validator import ( "encoding/hex" "fmt" - "github.com/ssvlabs/ssv/message/validation" "slices" "strconv" "strings" + "github.com/ssvlabs/ssv/message/validation" + "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/herumi/bls-eth-go-binary/bls" "github.com/jellydator/ttlcache/v3" From 4aa01c31d5b359940336ce2502ed84d22f89bd5e Mon Sep 17 00:00:00 2001 From: y0sher Date: Tue, 4 Feb 2025 15:23:34 +0200 Subject: [PATCH 4/5] goimports --- protocol/v2/ssv/validator/non_committee_validator.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/protocol/v2/ssv/validator/non_committee_validator.go b/protocol/v2/ssv/validator/non_committee_validator.go index fb9ebcc32d..4fc9d2a72a 100644 --- a/protocol/v2/ssv/validator/non_committee_validator.go +++ b/protocol/v2/ssv/validator/non_committee_validator.go @@ -7,13 +7,12 @@ import ( "strconv" "strings" - "github.com/ssvlabs/ssv/message/validation" - "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/herumi/bls-eth-go-binary/bls" "github.com/jellydator/ttlcache/v3" specqbft "github.com/ssvlabs/ssv-spec/qbft" spectypes "github.com/ssvlabs/ssv-spec/types" + "github.com/ssvlabs/ssv/message/validation" "go.uber.org/zap" "github.com/ssvlabs/ssv/ibft/storage" From dcdd94a0d1c1ab31b63c9e292cd2a41d517c448f Mon Sep 17 00:00:00 2001 From: y0sher Date: Tue, 4 Feb 2025 15:23:45 +0200 Subject: [PATCH 5/5] goimports --- protocol/v2/ssv/validator/non_committee_validator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/v2/ssv/validator/non_committee_validator.go b/protocol/v2/ssv/validator/non_committee_validator.go index 4fc9d2a72a..6902e87fd5 100644 --- a/protocol/v2/ssv/validator/non_committee_validator.go +++ b/protocol/v2/ssv/validator/non_committee_validator.go @@ -12,11 +12,11 @@ import ( "github.com/jellydator/ttlcache/v3" specqbft "github.com/ssvlabs/ssv-spec/qbft" spectypes "github.com/ssvlabs/ssv-spec/types" - "github.com/ssvlabs/ssv/message/validation" "go.uber.org/zap" "github.com/ssvlabs/ssv/ibft/storage" "github.com/ssvlabs/ssv/logging/fields" + "github.com/ssvlabs/ssv/message/validation" "github.com/ssvlabs/ssv/networkconfig" "github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon" qbftcontroller "github.com/ssvlabs/ssv/protocol/v2/qbft/controller"