Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First round of fixes in making gossip publishing good for the validator: See comment #9972

Merged
merged 101 commits into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
101 commits
Select commit Hold shift + click to select a range
1c17c52
save
Giulio2002 Apr 17, 2024
bea66f1
save
Giulio2002 Apr 17, 2024
8b96825
save
Giulio2002 Apr 17, 2024
4de3055
save
Giulio2002 Apr 17, 2024
e26fc9a
save
Giulio2002 Apr 17, 2024
5364655
save
Giulio2002 Apr 17, 2024
540c2ff
save
Giulio2002 Apr 17, 2024
6cf0ba3
save
Giulio2002 Apr 17, 2024
94de4ce
save
Giulio2002 Apr 17, 2024
5a192dd
save
Giulio2002 Apr 17, 2024
9a7d6cd
save
Giulio2002 Apr 17, 2024
a3afbfc
save
Giulio2002 Apr 17, 2024
15f4b24
save
Giulio2002 Apr 17, 2024
be135bc
save
Giulio2002 Apr 17, 2024
1da2bc6
save
Giulio2002 Apr 17, 2024
4e8d50a
save
Giulio2002 Apr 17, 2024
5b0ae23
save
Giulio2002 Apr 17, 2024
1cd72d7
save
Giulio2002 Apr 17, 2024
d17e867
save
Giulio2002 Apr 17, 2024
3193a6e
save
Giulio2002 Apr 17, 2024
a35528a
save
Giulio2002 Apr 17, 2024
bb32e97
save
Giulio2002 Apr 17, 2024
14079b3
save
Giulio2002 Apr 17, 2024
24229de
save
Giulio2002 Apr 17, 2024
c6d324b
save
Giulio2002 Apr 17, 2024
35f18c9
save
Giulio2002 Apr 17, 2024
df776cc
save
Giulio2002 Apr 17, 2024
e77e925
save
Giulio2002 Apr 17, 2024
4266e8e
save
Giulio2002 Apr 17, 2024
b1f01df
save
Giulio2002 Apr 17, 2024
1256e01
save
Giulio2002 Apr 17, 2024
8b36363
save
Giulio2002 Apr 17, 2024
e8eadf1
save
Giulio2002 Apr 17, 2024
8e9f1d3
save
Giulio2002 Apr 17, 2024
43cb76b
save
Giulio2002 Apr 17, 2024
8963fad
save
Giulio2002 Apr 17, 2024
e557f36
save
Giulio2002 Apr 17, 2024
fc339d1
save
Giulio2002 Apr 17, 2024
96352d0
save
Giulio2002 Apr 17, 2024
e740116
save
Giulio2002 Apr 17, 2024
22ac4f8
save
Giulio2002 Apr 17, 2024
f7424f7
save
Giulio2002 Apr 17, 2024
c3e61f1
save
Giulio2002 Apr 17, 2024
a7da4d1
save
Giulio2002 Apr 17, 2024
54ff39d
save
Giulio2002 Apr 17, 2024
a467753
save
Giulio2002 Apr 18, 2024
01c2a2a
save
Giulio2002 Apr 18, 2024
ea10e9c
save
Giulio2002 Apr 18, 2024
2708aa5
save
Giulio2002 Apr 18, 2024
07fee55
save
Giulio2002 Apr 18, 2024
58050dc
save
Giulio2002 Apr 18, 2024
9e5d8f1
save
Giulio2002 Apr 18, 2024
8848c00
save
Giulio2002 Apr 18, 2024
a33a810
save
Giulio2002 Apr 18, 2024
cee7cc4
save
Giulio2002 Apr 18, 2024
5b4a6a3
save
Giulio2002 Apr 18, 2024
06ecac9
save
Giulio2002 Apr 18, 2024
866757f
save
Giulio2002 Apr 18, 2024
7a60f56
save
Giulio2002 Apr 18, 2024
1b3a6e4
save
Giulio2002 Apr 18, 2024
9af5dba
save
Giulio2002 Apr 18, 2024
a723b5e
save
Giulio2002 Apr 18, 2024
624c67d
save
Giulio2002 Apr 18, 2024
e518131
save
Giulio2002 Apr 18, 2024
09efe7d
save
Giulio2002 Apr 18, 2024
18f1b01
save
Giulio2002 Apr 18, 2024
deb64f6
save
Giulio2002 Apr 18, 2024
3442ff8
save
Giulio2002 Apr 18, 2024
15ab540
save
Giulio2002 Apr 18, 2024
0091593
save
Giulio2002 Apr 18, 2024
6e8219b
save
Giulio2002 Apr 18, 2024
4178dfa
save
Giulio2002 Apr 18, 2024
cc889f7
save
Giulio2002 Apr 18, 2024
1dab5fd
save
Giulio2002 Apr 18, 2024
782c6ab
save
Giulio2002 Apr 18, 2024
6c8b53f
save
Giulio2002 Apr 18, 2024
f4237de
save
Giulio2002 Apr 18, 2024
a426e3a
save
Giulio2002 Apr 18, 2024
0314c5e
save
Giulio2002 Apr 18, 2024
9cd13ce
save
Giulio2002 Apr 19, 2024
a6ca2a3
save
Giulio2002 Apr 19, 2024
d53b77d
save
Giulio2002 Apr 19, 2024
cfc947e
save
Giulio2002 Apr 19, 2024
5732c4c
save
Giulio2002 Apr 19, 2024
a38ca78
save
Giulio2002 Apr 19, 2024
3fda821
save
Giulio2002 Apr 19, 2024
5663e45
save
Giulio2002 Apr 19, 2024
865f1ab
save
Giulio2002 Apr 19, 2024
7e98554
save
Giulio2002 Apr 19, 2024
8085329
save
Giulio2002 Apr 19, 2024
b966a1a
save
Giulio2002 Apr 19, 2024
ff4208f
save
Giulio2002 Apr 19, 2024
6846a3f
save
Giulio2002 Apr 19, 2024
2cc68d9
fix ut
domiwei Apr 20, 2024
2be4eee
linter
domiwei Apr 20, 2024
630451c
save
Giulio2002 Apr 20, 2024
0e3e442
save
Giulio2002 Apr 20, 2024
71e1d2a
fix lint warning
domiwei Apr 21, 2024
d1c5657
udpate
domiwei Apr 21, 2024
b88bea7
Update cl/sentinel/utils.go
Giulio2002 Apr 21, 2024
1663a60
save
Giulio2002 Apr 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions cl/aggregation/pool_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/ledgerwatch/erigon/cl/utils/eth_clock"
)

var ErrIsSuperset = fmt.Errorf("attestation is superset of existing attestation")

var (
blsAggregate = bls.AggregateSignatures
)
Expand Down Expand Up @@ -55,13 +57,12 @@ func (p *aggregationPoolImpl) AddAttestation(inAtt *solid.Attestation) error {
defer p.aggregatesLock.Unlock()
att, ok := p.aggregates[hashRoot]
if !ok {
p.aggregates[hashRoot] = inAtt.Clone().(*solid.Attestation)
p.aggregates[hashRoot] = inAtt.Copy()
return nil
}

if utils.IsSupersetBitlist(att.AggregationBits(), inAtt.AggregationBits()) {
// no need to merge existing signatures
return nil
return ErrIsSuperset
}

// merge signature
Expand All @@ -71,7 +72,7 @@ func (p *aggregationPoolImpl) AddAttestation(inAtt *solid.Attestation) error {
if err != nil {
return err
}
if len(merged) > 96 {
if len(merged) != 96 {
return fmt.Errorf("merged signature is too long")
}
var mergedSig [96]byte
Expand Down Expand Up @@ -99,7 +100,7 @@ func (p *aggregationPoolImpl) GetAggregatationByRoot(root common.Hash) *solid.At
if att == nil {
return nil
}
return att.Clone().(*solid.Attestation)
return att.Copy()
}

func (p *aggregationPoolImpl) sweepStaleAtt(ctx context.Context) {
Expand Down
3 changes: 3 additions & 0 deletions cl/beacon/handler/duties_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func (a *ApiHandler) getSyncDuties(w http.ResponseWriter, r *http.Request) (*bea

// Now try reading the sync committee
syncCommittee, _, ok := a.forkchoiceStore.GetSyncCommittees(period)
if !ok {
_, syncCommittee, ok = a.forkchoiceStore.GetSyncCommittees(period - 1)
}
// Read them from the archive node if we do not have them in the fast-access storage
if !ok {
syncCommittee, err = state_accessors.ReadCurrentSyncCommittee(tx, a.beaconChainCfg.RoundSlotToSyncCommitteePeriod(startSlotAtEpoch))
Expand Down
3 changes: 3 additions & 0 deletions cl/beacon/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type ApiHandler struct {
syncCommitteeMessagesService services.SyncCommitteeMessagesService
syncContributionAndProofsService services.SyncContributionService
aggregateAndProofsService services.AggregateAndProofService
attestationService services.AttestationService
}

func NewApiHandler(
Expand Down Expand Up @@ -110,6 +111,7 @@ func NewApiHandler(
syncCommitteeMessagesService services.SyncCommitteeMessagesService,
syncContributionAndProofs services.SyncContributionService,
aggregateAndProofs services.AggregateAndProofService,
attestationService services.AttestationService,
) *ApiHandler {
blobBundles, err := lru.New[common.Bytes48, BlobBundle]("blobs", maxBlobBundleCacheSize)
if err != nil {
Expand Down Expand Up @@ -146,6 +148,7 @@ func NewApiHandler(
syncCommitteeMessagesService: syncCommitteeMessagesService,
syncContributionAndProofsService: syncContributionAndProofs,
aggregateAndProofsService: aggregateAndProofs,
attestationService: attestationService,
}
}

Expand Down
7 changes: 2 additions & 5 deletions cl/beacon/handler/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,11 @@ func (a *ApiHandler) GetEthV1NodeIdentity(w http.ResponseWriter, r *http.Request

func (a *ApiHandler) GetEthV1NodeSyncing(w http.ResponseWriter, r *http.Request) {
currentSlot := a.ethClock.GetCurrentSlot()
var syncDistance uint64
if a.syncedData.Syncing() {
syncDistance = currentSlot - a.syncedData.HeadSlot()
}

if err := json.NewEncoder(w).Encode(map[string]interface{}{
"data": map[string]interface{}{
"head_slot": strconv.FormatUint(a.syncedData.HeadSlot(), 10),
"sync_distance": strconv.FormatUint(syncDistance, 10),
"sync_distance": strconv.FormatUint(currentSlot-a.syncedData.HeadSlot(), 10),
"is_syncing": a.syncedData.Syncing(),
"is_optimistic": false, // needs to change
"el_offline": false,
Expand Down
76 changes: 44 additions & 32 deletions cl/beacon/handler/pool.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package handler

import (
"bytes"
"encoding/json"
"errors"
"net/http"
Expand Down Expand Up @@ -64,44 +65,44 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h
beaconhttp.NewEndpointError(http.StatusBadRequest, err).WriteTo(w)
return
}

headState := a.syncedData.HeadState()
if headState == nil {
beaconhttp.NewEndpointError(http.StatusServiceUnavailable, errors.New("head state not available")).WriteTo(w)
return
}
failures := []poolingFailure{}
for i, attestation := range req {
if err := a.forkchoiceStore.ValidateOnAttestation(attestation); err != nil {
failures = append(failures, poolingFailure{
Index: i,
Message: err.Error(),
})
continue
}
if err := a.committeeSub.CheckAggregateAttestation(attestation); err != nil {
var (
slot = attestation.AttestantionData().Slot()
cIndex = attestation.AttestantionData().CommitteeIndex()
committeeCountPerSlot = headState.CommitteeCount(slot / a.beaconChainCfg.SlotsPerEpoch)
subnet = subnets.ComputeSubnetForAttestation(committeeCountPerSlot, slot, cIndex, a.beaconChainCfg.SlotsPerEpoch, a.netConfig.AttestationSubnetCount)
)
_ = i
if err := a.attestationService.ProcessMessage(r.Context(), &subnet, attestation); err != nil {
Giulio2002 marked this conversation as resolved.
Show resolved Hide resolved
log.Warn("[Beacon REST] failed to process attestation", "err", err)
failures = append(failures, poolingFailure{
Index: i,
Message: err.Error(),
})
continue
}
if a.sentinel != nil {
// broadcast
var (
slot = attestation.AttestantionData().Slot()
cIndex = attestation.AttestantionData().CommitteeIndex()
committeeCountPerSlot = subnets.ComputeCommitteeCountPerSlot(a.syncedData.HeadState(), slot, a.beaconChainCfg.SlotsPerEpoch)
subnet = subnets.ComputeSubnetForAttestation(committeeCountPerSlot, slot, cIndex, a.beaconChainCfg.SlotsPerEpoch, a.netConfig.AttestationSubnetCount)
)
encodedSSZ, err := attestation.EncodeSSZ(nil)
if err != nil {
beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w)
return
}
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameBeaconAttestation(subnet),
SubnetId: &subnet,
}); err != nil {
beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w)
return
}
}
// if a.sentinel != nil {
// encodedSSZ, err := attestation.EncodeSSZ(nil)
// if err != nil {
// beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w)
// return
// }
// if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
// Data: encodedSSZ,
// Name: gossip.TopicNamePrefixBeaconAttestation,
// SubnetId: &subnet,
// }); err != nil {
// beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w)
// return
// }
// }
}
if len(failures) > 0 {
errResp := poolingError{
Expand Down Expand Up @@ -266,6 +267,7 @@ func (a *ApiHandler) PostEthV1ValidatorAggregatesAndProof(w http.ResponseWriter,
failures := []poolingFailure{}
for _, v := range req {
if err := a.aggregateAndProofsService.ProcessMessage(r.Context(), nil, v); err != nil && !errors.Is(err, services.ErrIgnore) {
log.Warn("[Beacon REST] failed to process bls-change", "err", err)
failures = append(failures, poolingFailure{Index: len(failures), Message: err.Error()})
continue
}
Expand All @@ -274,12 +276,14 @@ func (a *ApiHandler) PostEthV1ValidatorAggregatesAndProof(w http.ResponseWriter,
encodedSSZ, err := v.EncodeSSZ(nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Warn("[Beacon REST] failed to encode aggregate and proof", "err", err)
return
}
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameBeaconAggregateAndProof,
}); err != nil {
log.Warn("[Beacon REST] failed to publish gossip", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -308,7 +312,8 @@ func (a *ApiHandler) PostEthV1BeaconPoolSyncCommittees(w http.ResponseWriter, r
continue
}
for _, subnet := range publishingSubnets {
if err := a.syncCommitteeMessagesService.ProcessMessage(r.Context(), &subnet, v); err != nil && !errors.Is(err, services.ErrIgnore) {
if err = a.syncCommitteeMessagesService.ProcessMessage(r.Context(), &subnet, v); err != nil && !errors.Is(err, services.ErrIgnore) {
log.Warn("[Beacon REST] failed to process attestation", "err", err)
failures = append(failures, poolingFailure{Index: idx, Message: err.Error()})
break
}
Expand Down Expand Up @@ -354,8 +359,13 @@ func (a *ApiHandler) PostEthV1ValidatorContributionsAndProofs(w http.ResponseWri
return
}
failures := []poolingFailure{}
var err error
for idx, v := range msgs {
if err := a.syncContributionAndProofsService.ProcessMessage(r.Context(), nil, v); err != nil && !errors.Is(err, services.ErrIgnore) {
if bytes.Equal(v.Message.Contribution.AggregationBits, make([]byte, len(v.Message.Contribution.AggregationBits))) {
continue // skip empty contributions
}
if err = a.syncContributionAndProofsService.ProcessMessage(r.Context(), nil, v); err != nil && !errors.Is(err, services.ErrIgnore) {
log.Warn("[Beacon REST] failed to process sync contribution", "err", err)
failures = append(failures, poolingFailure{Index: idx, Message: err.Error()})
continue
}
Expand All @@ -364,12 +374,14 @@ func (a *ApiHandler) PostEthV1ValidatorContributionsAndProofs(w http.ResponseWri
encodedSSZ, err := v.EncodeSSZ(nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Warn("[Beacon REST] failed to encode sync contribution", "err", err)
return
}
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameSyncCommitteeContributionAndProof,
}); err != nil {
log.Warn("[Beacon REST] failed to publish gossip", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down
6 changes: 4 additions & 2 deletions cl/beacon/handler/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,15 @@ func TestPoolSyncCommittees(t *testing.T) {
}

func TestPoolSyncContributionAndProofs(t *testing.T) {
aggrBits := make([]byte, cltypes.SyncCommitteeAggregationBitsSize)
aggrBits[0] = 1
msgs := []*cltypes.SignedContributionAndProof{
{
Message: &cltypes.ContributionAndProof{
Contribution: &cltypes.Contribution{
Slot: 1,
BeaconBlockRoot: libcommon.Hash{1, 2, 3, 4, 5, 6, 7, 8},
AggregationBits: make([]byte, cltypes.SyncCommitteeAggregationBitsSize),
AggregationBits: aggrBits,
},
},
},
Expand Down Expand Up @@ -332,6 +334,6 @@ func TestPoolSyncContributionAndProofs(t *testing.T) {
Slot: 1,
BeaconBlockRoot: libcommon.Hash{1, 2, 3, 4, 5, 6, 7, 8},
SubcommitteeIndex: 0,
AggregationBits: make([]byte, cltypes.SyncCommitteeAggregationBitsSize),
AggregationBits: aggrBits,
})
}
2 changes: 1 addition & 1 deletion cl/beacon/handler/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (a *ApiHandler) PostEthV1ValidatorSyncCommitteeSubscriptions(w http.Respons
// subscribe to subnets
for _, subnet := range syncnets {
if _, err := a.sentinel.SetSubscribeExpiry(r.Context(), &sentinel.RequestSubscribeExpiry{
Topic: fmt.Sprintf(gossip.TopicNamePrefixSyncCommittee, subnet),
Topic: gossip.TopicNameSyncCommittee(int(subnet)),
ExpiryUnixSecs: uint64(expiry.Unix()),
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
2 changes: 1 addition & 1 deletion cl/beacon/handler/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge
Events: true,
Validator: true,
Lighthouse: true,
}, nil, blobStorage, nil, vp, nil, nil, fcu.SyncContributionPool, nil, nil, syncCommitteeMessagesService, syncContributionService, aggregateAndProofsService) // TODO: add tests
}, nil, blobStorage, nil, vp, nil, nil, fcu.SyncContributionPool, nil, nil, syncCommitteeMessagesService, syncContributionService, aggregateAndProofsService, nil) // TODO: add tests
h.Init()
return
}
1 change: 1 addition & 0 deletions cl/beacon/handler/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (t *validatorTestSuite) SetupTest() {
nil,
nil,
nil,
nil,
)
t.gomockCtrl = gomockCtrl
}
Expand Down
1 change: 0 additions & 1 deletion cl/beacon/synced_data/synced_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func (s *SyncedDataManager) OnHeadState(newState *state.CachingBeaconState) (err
return err
}
s.headState.Store(st)
//fmt.Println(newState.CurrentSyncCommittee().GetCommittee())

return
}
Expand Down
2 changes: 1 addition & 1 deletion cl/clparams/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ var CheckpointSyncEndpoints = map[NetworkType][]string{
},
SepoliaNetwork: {
//"https://beaconstate-sepolia.chainsafe.io/eth/v2/debug/beacon/states/finalized",
"https://sepolia.beaconstate.info/eth/v2/debug/beacon/states/finalized",
//"https://sepolia.beaconstate.info/eth/v2/debug/beacon/states/finalized",
"https://checkpoint-sync.sepolia.ethpandaops.io/eth/v2/debug/beacon/states/finalized",
},
GnosisNetwork: {
Expand Down
8 changes: 8 additions & 0 deletions cl/cltypes/solid/attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ func (*Attestation) Static() bool {
return false
}

func (a *Attestation) Copy() *Attestation {
new := &Attestation{}
copy(new.staticBuffer[:], a.staticBuffer[:])
new.aggregationBitsBuffer = make([]byte, len(a.aggregationBitsBuffer))
copy(new.aggregationBitsBuffer, a.aggregationBitsBuffer)
return new
}

// NewAttestionFromParameters creates a new Attestation instance using provided parameters
func NewAttestionFromParameters(
aggregationBits []byte,
Expand Down
1 change: 0 additions & 1 deletion cl/phase1/forkchoice/fork_graph/fork_graph_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,6 @@ func (f *forkGraphDisk) Prune(pruneSlot uint64) (err error) {
f.currentJustifiedCheckpoints.Delete(root)
f.finalizedCheckpoints.Delete(root)
f.headers.Delete(root)
f.syncCommittees.Delete(root)
f.blockRewards.Delete(root)
f.fs.Remove(getBeaconStateFilename(root))
f.fs.Remove(getBeaconStateCacheFilename(root))
Expand Down
5 changes: 5 additions & 0 deletions cl/phase1/forkchoice/forkchoice_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ForkChoiceStorageMock struct {
LCUpdates map[uint64]*cltypes.LightClientUpdate
SyncContributionPool sync_contribution_pool.SyncContributionPool
Headers map[common.Hash]*cltypes.BeaconBlockHeader
GetBeaconCommitteeMock func(slot, committeeIndex uint64) ([]uint64, error)

Pool pool.OperationsPool
}
Expand All @@ -67,6 +68,7 @@ func NewForkChoiceStorageMock() *ForkChoiceStorageMock {
LCUpdates: make(map[uint64]*cltypes.LightClientUpdate),
SyncContributionPool: sync_contribution_pool.NewSyncContributionPoolMock(),
Headers: make(map[common.Hash]*cltypes.BeaconBlockHeader),
GetBeaconCommitteeMock: nil,
}
}

Expand Down Expand Up @@ -128,6 +130,9 @@ func (f *ForkChoiceStorageMock) GetSyncCommittees(period uint64) (*solid.SyncCom
}

func (f *ForkChoiceStorageMock) GetBeaconCommitee(slot, committeeIndex uint64) ([]uint64, error) {
if f.GetBeaconCommitteeMock != nil {
return f.GetBeaconCommitteeMock(slot, committeeIndex)
}
return []uint64{1, 2, 3, 4, 5, 6, 7, 8}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (g *GossipManager) onRecv(ctx context.Context, data *sentinel.GossipData, l
return err
}
if _, err := g.sentinel.PublishGossip(ctx, data); err != nil {
log.Debug("failed publish gossip", "err", err)
log.Warn("failed publish gossip", "err", err)
}
return nil
}
Expand Down Expand Up @@ -179,7 +179,7 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss
return g.syncCommitteeMessagesService.ProcessMessage(ctx, data.SubnetId, msg)
case gossip.IsTopicBeaconAttestation(data.Name):
att := &solid.Attestation{}
if err := att.DecodeSSZ(common.CopyBytes(data.Data), int(version)); err != nil {
if err := att.DecodeSSZ(data.Data, int(version)); err != nil {
return err
}
return g.attestationService.ProcessMessage(ctx, data.SubnetId, att)
Expand Down
3 changes: 3 additions & 0 deletions cl/phase1/network/services/aggregate_and_proof_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage(ctx context.Context, subne

// Add to aggregation pool
if err := a.aggregationPool.AddAttestation(aggregateAndProof.Message.Aggregate); err != nil {
if errors.Is(err, aggregation.ErrIsSuperset) {
return ErrIgnore
}
return errors.WithMessagef(err, "failed to add attestation to pool")
}

Expand Down
Loading
Loading