Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First round of fixes in making gossip publishing good for the validator: See comment #9972

Merged
merged 101 commits into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
101 commits
Select commit Hold shift + click to select a range
1c17c52
save
Giulio2002 Apr 17, 2024
bea66f1
save
Giulio2002 Apr 17, 2024
8b96825
save
Giulio2002 Apr 17, 2024
4de3055
save
Giulio2002 Apr 17, 2024
e26fc9a
save
Giulio2002 Apr 17, 2024
5364655
save
Giulio2002 Apr 17, 2024
540c2ff
save
Giulio2002 Apr 17, 2024
6cf0ba3
save
Giulio2002 Apr 17, 2024
94de4ce
save
Giulio2002 Apr 17, 2024
5a192dd
save
Giulio2002 Apr 17, 2024
9a7d6cd
save
Giulio2002 Apr 17, 2024
a3afbfc
save
Giulio2002 Apr 17, 2024
15f4b24
save
Giulio2002 Apr 17, 2024
be135bc
save
Giulio2002 Apr 17, 2024
1da2bc6
save
Giulio2002 Apr 17, 2024
4e8d50a
save
Giulio2002 Apr 17, 2024
5b0ae23
save
Giulio2002 Apr 17, 2024
1cd72d7
save
Giulio2002 Apr 17, 2024
d17e867
save
Giulio2002 Apr 17, 2024
3193a6e
save
Giulio2002 Apr 17, 2024
a35528a
save
Giulio2002 Apr 17, 2024
bb32e97
save
Giulio2002 Apr 17, 2024
14079b3
save
Giulio2002 Apr 17, 2024
24229de
save
Giulio2002 Apr 17, 2024
c6d324b
save
Giulio2002 Apr 17, 2024
35f18c9
save
Giulio2002 Apr 17, 2024
df776cc
save
Giulio2002 Apr 17, 2024
e77e925
save
Giulio2002 Apr 17, 2024
4266e8e
save
Giulio2002 Apr 17, 2024
b1f01df
save
Giulio2002 Apr 17, 2024
1256e01
save
Giulio2002 Apr 17, 2024
8b36363
save
Giulio2002 Apr 17, 2024
e8eadf1
save
Giulio2002 Apr 17, 2024
8e9f1d3
save
Giulio2002 Apr 17, 2024
43cb76b
save
Giulio2002 Apr 17, 2024
8963fad
save
Giulio2002 Apr 17, 2024
e557f36
save
Giulio2002 Apr 17, 2024
fc339d1
save
Giulio2002 Apr 17, 2024
96352d0
save
Giulio2002 Apr 17, 2024
e740116
save
Giulio2002 Apr 17, 2024
22ac4f8
save
Giulio2002 Apr 17, 2024
f7424f7
save
Giulio2002 Apr 17, 2024
c3e61f1
save
Giulio2002 Apr 17, 2024
a7da4d1
save
Giulio2002 Apr 17, 2024
54ff39d
save
Giulio2002 Apr 17, 2024
a467753
save
Giulio2002 Apr 18, 2024
01c2a2a
save
Giulio2002 Apr 18, 2024
ea10e9c
save
Giulio2002 Apr 18, 2024
2708aa5
save
Giulio2002 Apr 18, 2024
07fee55
save
Giulio2002 Apr 18, 2024
58050dc
save
Giulio2002 Apr 18, 2024
9e5d8f1
save
Giulio2002 Apr 18, 2024
8848c00
save
Giulio2002 Apr 18, 2024
a33a810
save
Giulio2002 Apr 18, 2024
cee7cc4
save
Giulio2002 Apr 18, 2024
5b4a6a3
save
Giulio2002 Apr 18, 2024
06ecac9
save
Giulio2002 Apr 18, 2024
866757f
save
Giulio2002 Apr 18, 2024
7a60f56
save
Giulio2002 Apr 18, 2024
1b3a6e4
save
Giulio2002 Apr 18, 2024
9af5dba
save
Giulio2002 Apr 18, 2024
a723b5e
save
Giulio2002 Apr 18, 2024
624c67d
save
Giulio2002 Apr 18, 2024
e518131
save
Giulio2002 Apr 18, 2024
09efe7d
save
Giulio2002 Apr 18, 2024
18f1b01
save
Giulio2002 Apr 18, 2024
deb64f6
save
Giulio2002 Apr 18, 2024
3442ff8
save
Giulio2002 Apr 18, 2024
15ab540
save
Giulio2002 Apr 18, 2024
0091593
save
Giulio2002 Apr 18, 2024
6e8219b
save
Giulio2002 Apr 18, 2024
4178dfa
save
Giulio2002 Apr 18, 2024
cc889f7
save
Giulio2002 Apr 18, 2024
1dab5fd
save
Giulio2002 Apr 18, 2024
782c6ab
save
Giulio2002 Apr 18, 2024
6c8b53f
save
Giulio2002 Apr 18, 2024
f4237de
save
Giulio2002 Apr 18, 2024
a426e3a
save
Giulio2002 Apr 18, 2024
0314c5e
save
Giulio2002 Apr 18, 2024
9cd13ce
save
Giulio2002 Apr 19, 2024
a6ca2a3
save
Giulio2002 Apr 19, 2024
d53b77d
save
Giulio2002 Apr 19, 2024
cfc947e
save
Giulio2002 Apr 19, 2024
5732c4c
save
Giulio2002 Apr 19, 2024
a38ca78
save
Giulio2002 Apr 19, 2024
3fda821
save
Giulio2002 Apr 19, 2024
5663e45
save
Giulio2002 Apr 19, 2024
865f1ab
save
Giulio2002 Apr 19, 2024
7e98554
save
Giulio2002 Apr 19, 2024
8085329
save
Giulio2002 Apr 19, 2024
b966a1a
save
Giulio2002 Apr 19, 2024
ff4208f
save
Giulio2002 Apr 19, 2024
6846a3f
save
Giulio2002 Apr 19, 2024
2cc68d9
fix ut
domiwei Apr 20, 2024
2be4eee
linter
domiwei Apr 20, 2024
630451c
save
Giulio2002 Apr 20, 2024
0e3e442
save
Giulio2002 Apr 20, 2024
71e1d2a
fix lint warning
domiwei Apr 21, 2024
d1c5657
udpate
domiwei Apr 21, 2024
b88bea7
Update cl/sentinel/utils.go
Giulio2002 Apr 21, 2024
1663a60
save
Giulio2002 Apr 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cl/aggregation/pool_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (p *aggregationPoolImpl) AddAttestation(inAtt *solid.Attestation) error {
if err != nil {
return err
}
if len(merged) > 96 {
if len(merged) != 96 {
return fmt.Errorf("merged signature is too long")
}
var mergedSig [96]byte
Expand Down
3 changes: 3 additions & 0 deletions cl/beacon/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type ApiHandler struct {
syncCommitteeMessagesService services.SyncCommitteeMessagesService
syncContributionAndProofsService services.SyncContributionService
aggregateAndProofsService services.AggregateAndProofService
attestationService services.AttestationService
}

func NewApiHandler(
Expand Down Expand Up @@ -110,6 +111,7 @@ func NewApiHandler(
syncCommitteeMessagesService services.SyncCommitteeMessagesService,
syncContributionAndProofs services.SyncContributionService,
aggregateAndProofs services.AggregateAndProofService,
attestationService services.AttestationService,
) *ApiHandler {
blobBundles, err := lru.New[common.Bytes48, BlobBundle]("blobs", maxBlobBundleCacheSize)
if err != nil {
Expand Down Expand Up @@ -146,6 +148,7 @@ func NewApiHandler(
syncCommitteeMessagesService: syncCommitteeMessagesService,
syncContributionAndProofsService: syncContributionAndProofs,
aggregateAndProofsService: aggregateAndProofs,
attestationService: attestationService,
}
}

Expand Down
23 changes: 15 additions & 8 deletions cl/beacon/handler/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handler
import (
"encoding/json"
"errors"
"fmt"
"net/http"

"github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel"
Expand Down Expand Up @@ -64,9 +65,20 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h
beaconhttp.NewEndpointError(http.StatusBadRequest, err).WriteTo(w)
return
}
headState := a.syncedData.HeadState()
if headState == nil {
beaconhttp.NewEndpointError(http.StatusServiceUnavailable, errors.New("head state not available")).WriteTo(w)
return
}
failures := []poolingFailure{}
for i, attestation := range req {
if err := a.forkchoiceStore.ValidateOnAttestation(attestation); err != nil {
var (
slot = attestation.AttestantionData().Slot()
cIndex = attestation.AttestantionData().CommitteeIndex()
committeeCountPerSlot = headState.CommitteeCount(slot / uint64(a.beaconChainCfg.SlotsPerEpoch))
subnet = subnets.ComputeSubnetForAttestation(committeeCountPerSlot, slot, cIndex, a.beaconChainCfg.SlotsPerEpoch, a.netConfig.AttestationSubnetCount)
)
if err := a.attestationService.ProcessMessage(r.Context(), &subnet, attestation); err != nil {
Giulio2002 marked this conversation as resolved.
Show resolved Hide resolved
failures = append(failures, poolingFailure{
Index: i,
Message: err.Error(),
Expand All @@ -81,18 +93,13 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h
continue
}
if a.sentinel != nil {
// broadcast
var (
slot = attestation.AttestantionData().Slot()
cIndex = attestation.AttestantionData().CommitteeIndex()
committeeCountPerSlot = subnets.ComputeCommitteeCountPerSlot(a.syncedData.HeadState(), slot, a.beaconChainCfg.SlotsPerEpoch)
subnet = subnets.ComputeSubnetForAttestation(committeeCountPerSlot, slot, cIndex, a.beaconChainCfg.SlotsPerEpoch, a.netConfig.AttestationSubnetCount)
)

encodedSSZ, err := attestation.EncodeSSZ(nil)
if err != nil {
beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w)
return
}
fmt.Println(subnet)
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameBeaconAttestation(subnet),
Expand Down
68 changes: 38 additions & 30 deletions cl/phase1/network/services/attestation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ import (
)

var (
computeCommitteeCountPerSlot = subnets.ComputeCommitteeCountPerSlot
computeSubnetForAttestation = subnets.ComputeSubnetForAttestation
computeSubnetForAttestation = subnets.ComputeSubnetForAttestation
)

type attestationService struct {
Expand Down Expand Up @@ -55,14 +54,19 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
root = att.AttestantionData().BeaconBlockRoot()
slot = att.AttestantionData().Slot()
committeeIndex = att.AttestantionData().CommitteeIndex()
epoch = att.AttestantionData().Target().Epoch()
bits = att.AggregationBits()
targetEpoch = att.AttestantionData().Target().Epoch()
)
headState := s.syncedDataManager.HeadState()
if headState == nil {
return ErrIgnore
}

// [REJECT] The committee index is within the expected range
committeeCount := computeCommitteeCountPerSlot(s.syncedDataManager.HeadState(), slot, s.beaconCfg.SlotsPerEpoch)
committeeCount := headState.CommitteeCount(att.AttestantionData().Target().Epoch())
if committeeIndex >= committeeCount {
return fmt.Errorf("committee index out of range")
}
fmt.Println("received", *subnet)
// [REJECT] The attestation is for the correct subnet -- i.e. compute_subnet_for_attestation(committees_per_slot, attestation.data.slot, index) == subnet_id
subnetId := computeSubnetForAttestation(committeeCount, slot, committeeIndex, s.beaconCfg.SlotsPerEpoch, s.netCfg.AttestationSubnetCount)
if subnet == nil || subnetId != *subnet {
Expand All @@ -75,33 +79,37 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
return fmt.Errorf("not in propagation range %w", ErrIgnore)
}
// [REJECT] The attestation's epoch matches its target -- i.e. attestation.data.target.epoch == compute_epoch_at_slot(attestation.data.slot)
if epoch != slot/s.beaconCfg.SlotsPerEpoch {
if targetEpoch != slot/s.beaconCfg.SlotsPerEpoch {
return fmt.Errorf("epoch mismatch")
}
// [REJECT] The number of aggregation bits matches the committee size -- i.e. len(aggregation_bits) == len(get_beacon_committee(state, attestation.data.slot, index)).
beaconCommitte, err := headState.GetBeaconCommitee(slot, committeeIndex)
if err != nil {
return err
}
expectedAggregationBitsLength := (len(beaconCommitte) + 7) / 8
if len(att.AggregationBits()) < expectedAggregationBitsLength {
return fmt.Errorf("aggregation bits count mismatch: %d != %d", len(att.AggregationBits()), expectedAggregationBitsLength)
}

bits := att.AggregationBits()
//[REJECT] The attestation is unaggregated -- that is, it has exactly one participating validator (len([bit for bit in aggregation_bits if bit]) == 1, i.e. exactly 1 bit is set).
emptyCount := 0
onBitIndex := 0
setBits := 0
onBitIndex := 0 // Aggregationbits is []byte, so we need to iterate over all bits.
for i := 0; i < len(bits); i++ {
if bits[i] == 0 {
emptyCount++
} else if bits[i]&(bits[i]-1) != 0 {
return fmt.Errorf("more than one bit set")
} else {
// find the onBit bit index
for onBit := uint(0); onBit < 8; onBit++ {
if bits[i]&(1<<onBit) != 0 {
onBitIndex = i*8 + int(onBit)
break
}
for j := 0; j < 8; j++ {
if bits[i]&(1<<uint(j)) != 0 {
setBits++
onBitIndex = i*8 + j
}
}
}
if emptyCount != len(bits)-1 {
return fmt.Errorf("no bit set")

if setBits == 0 {
return ErrIgnore
}
// [REJECT] The number of aggregation bits matches the committee size -- i.e. len(aggregation_bits) == len(get_beacon_committee(state, attestation.data.slot, index)).
if len(bits)*8 < int(committeeCount) {
return fmt.Errorf("aggregation bits count mismatch")
if setBits != 1 {
return fmt.Errorf("attestation does not have exactly one participating validator")
}

// [IGNORE] There has been no other valid attestation seen on an attestation subnet that has an identical attestation.data.target.epoch and participating validator index.
Expand All @@ -117,15 +125,15 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
vIndex := committees[onBitIndex]
s.validatorAttSeenLock.Lock()
defer s.validatorAttSeenLock.Unlock()
if _, ok := s.validatorAttestationSeen[epoch]; !ok {
s.validatorAttestationSeen[epoch] = make(map[uint64]struct{})
if _, ok := s.validatorAttestationSeen[targetEpoch]; !ok {
s.validatorAttestationSeen[targetEpoch] = make(map[uint64]struct{})
}
if _, ok := s.validatorAttestationSeen[epoch][vIndex]; ok {
if _, ok := s.validatorAttestationSeen[targetEpoch][vIndex]; ok {
return fmt.Errorf("validator already seen in target epoch %w", ErrIgnore)
}
s.validatorAttestationSeen[epoch][vIndex] = struct{}{}
s.validatorAttestationSeen[targetEpoch][vIndex] = struct{}{}
// always check and delete previous epoch if it exists
delete(s.validatorAttestationSeen, epoch-1)
delete(s.validatorAttestationSeen, targetEpoch-1)
return nil
}(); err != nil {
return err
Expand All @@ -139,7 +147,7 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,

// [REJECT] The attestation's target block is an ancestor of the block named in the LMD vote -- i.e.
// get_checkpoint_block(store, attestation.data.beacon_block_root, attestation.data.target.epoch) == attestation.data.target.root
startSlotAtEpoch := epoch * s.beaconCfg.SlotsPerEpoch
startSlotAtEpoch := targetEpoch * s.beaconCfg.SlotsPerEpoch
if s.forkchoiceStore.Ancestor(root, startSlotAtEpoch) != att.AttestantionData().Target().BlockRoot() {
return fmt.Errorf("invalid target block")
}
Expand Down
5 changes: 0 additions & 5 deletions cl/phase1/network/subnets/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,3 @@ func ComputeSubnetForAttestation(committeePerSlot, slot, committeeIndex, slotsPe
committeesSinceEpochStart := committeePerSlot * slotsSinceEpochStart
return (committeesSinceEpochStart + committeeIndex) % attSubnetCount
}

func ComputeCommitteeCountPerSlot(s *state.CachingBeaconState, slot uint64, slotsPerEpoch uint64) uint64 {
epoch := slot / slotsPerEpoch
return s.CommitteeCount(epoch)
}
2 changes: 1 addition & 1 deletion cl/validator/attestation_producer/attestation_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (ap *attestationProducer) ProduceAndCacheAttestationData(baseState *state.C
}

targetEpoch := state.Epoch(baseState)
epochStartTargetSlot := (targetEpoch * ap.beaconCfg.SlotsPerEpoch) - (ap.beaconCfg.SlotsPerEpoch - 1)
epochStartTargetSlot := targetEpoch * ap.beaconCfg.SlotsPerEpoch
var targetRoot libcommon.Hash
if epochStartTargetSlot == baseState.Slot() {
targetRoot = baseStateBlockRoot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,12 @@ func (c *CommitteeSubscribeMgmt) AddAttestationSubscription(ctx context.Context,
cIndex = p.CommitteeIndex
vIndex = p.ValidatorIndex
)
headState := c.syncedData.HeadState()
if headState == nil {
return fmt.Errorf("head state not available")
}

commiteePerSlot := subnets.ComputeCommitteeCountPerSlot(c.syncedData.HeadState(), slot, c.beaconConfig.SlotsPerEpoch)
commiteePerSlot := headState.CommitteeCount(p.Slot / c.beaconConfig.SlotsPerEpoch)
subnetId := subnets.ComputeSubnetForAttestation(commiteePerSlot, slot, cIndex, c.beaconConfig.SlotsPerEpoch, c.netConfig.AttestationSubnetCount)
// add validator to subscription
c.validatorSubsMutex.Lock()
Expand Down
1 change: 1 addition & 0 deletions cmd/caplin/caplin1/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func RunCaplinPhase1(ctx context.Context, engine execution_client.ExecutionEngin
syncCommitteeMessagesService,
syncContributionService,
aggregateAndProofService,
attestationService,
)
go beacon.ListenAndServe(&beacon.LayeredBeaconHandler{
ArchiveApi: apiHandler,
Expand Down
Loading