Skip to content

Commit

Permalink
Base implementation of weighted attestation data
Browse files Browse the repository at this point in the history
  • Loading branch information
oleg-ssvlabs committed Feb 3, 2025
1 parent 285cd60 commit d780ebf
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 24 deletions.
254 changes: 231 additions & 23 deletions beacon/goclient/attest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net/http"
"time"

"errors"

"github.com/attestantio/go-eth2-client/api"
eth2apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec"
Expand All @@ -14,6 +16,19 @@ import (
"go.uber.org/zap"
)

type (
attestationDataResponse struct {
clientAddr string
attestationData *phase0.AttestationData
score float64
}

attestationDataError struct {
clientAddr string
err error
}
)

// AttesterDuties returns attester duties for a given epoch.
func (gc *GoClient) AttesterDuties(ctx context.Context, epoch phase0.Epoch, validatorIndices []phase0.ValidatorIndex) ([]*eth2apiv1.AttesterDuty, error) {
start := time.Now()
Expand Down Expand Up @@ -60,38 +75,155 @@ func (gc *GoClient) GetAttestationData(slot phase0.Slot, committeeIndex phase0.C

// Have to make beacon node request and cache the result.
result, err, _ := gc.attestationReqInflight.Do(slot, func() (*phase0.AttestationData, error) {
attDataReqStart := time.Now()
resp, err := gc.multiClient.AttestationData(gc.ctx, &api.AttestationDataOpts{
Slot: slot,
})
// We have two timeouts: a soft timeout and a hard timeout.
// At the soft timeout, we return if we have any responses so far.
// At the hard timeout, we return unconditionally.
// The soft timeout is half the duration of the hard timeout.
const timeout = time.Second * 2
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
softCtx, softCancel := context.WithTimeout(ctx, timeout/2)
started := time.Now()

recordRequestDuration(gc.ctx, "AttestationData", gc.multiClient.Address(), http.MethodGet, time.Since(attDataReqStart), err)
requests := len(gc.clients)
respCh := make(chan *attestationDataResponse, requests)
errCh := make(chan *attestationDataError, requests)

if err != nil {
gc.log.Error(clResponseErrMsg,
zap.String("api", "AttestationData"),
zap.Error(err),
)
return nil, fmt.Errorf("failed to get attestation data: %w", err)
for _, client := range gc.clients {
go gc.fetchAttestationData(ctx, client, respCh, errCh, &api.AttestationDataOpts{Slot: slot})
}

// Wait for all responses (or context done).
var (
responded,
errored,
timedOut,
softTimedOut int
bestScore float64
bestAttestationData *phase0.AttestationData
bestClient string
)

// Loop 1: prior to soft timeout.
for responded+errored+timedOut+softTimedOut != requests {
select {
case resp := <-respCh:
responded++
gc.log.
With(
zap.Duration("elapsed", time.Since(started)),
zap.String("client_addr", resp.clientAddr),
zap.Int("responded", responded),
zap.Int("errored", errored),
zap.Int("timed_out", timedOut),
).Debug("response received")

if bestAttestationData == nil || resp.score > bestScore {
bestAttestationData = resp.attestationData
bestScore = resp.score
bestClient = resp.clientAddr
}
case err := <-errCh:
errored++
gc.log.
With(
zap.Duration("elapsed", time.Since(started)),
zap.String("client_addr", err.clientAddr),
zap.Int("responded", responded),
zap.Int("errored", errored),
zap.Int("timed_out", timedOut),
zap.Error(err.err),
).Error("error received")
case <-softCtx.Done():
// If we have any responses at this point we consider the non-responders timed out.
if responded > 0 {
timedOut = requests - responded - errored
gc.log.
With(
zap.Duration("elapsed", time.Since(started)),
zap.Int("responded", responded),
zap.Int("errored", errored),
zap.Int("timed_out", timedOut),
).Debug("soft timeout reached with responses")
} else {
gc.log.
With(
zap.Duration("elapsed", time.Since(started)),
zap.Int("errored", errored),
).Error("soft timeout reached with no responses")
}
// Set the number of requests that have soft timed out.
softTimedOut = requests - responded - errored - timedOut
}
}
if resp == nil {
gc.log.Error(clNilResponseErrMsg,
zap.String("api", "AttestationData"),
)
return nil, fmt.Errorf("attestation data response is nil")
softCancel()

// Loop 2: after soft timeout.
for responded+errored+timedOut != requests {
select {
case resp := <-respCh:
responded++
gc.log.
With(
zap.Duration("elapsed", time.Since(started)),
zap.String("client_addr", resp.clientAddr),
zap.Int("responded", responded),
zap.Int("errored", errored),
zap.Int("timed_out", timedOut),
).Debug("response received")
if bestAttestationData == nil || resp.score > bestScore {
bestAttestationData = resp.attestationData
bestScore = resp.score
bestClient = resp.clientAddr
}
case err := <-errCh:
errored++
gc.log.
With(
zap.Duration("elapsed", time.Since(started)),
zap.String("client_addr", err.clientAddr),
zap.Int("responded", responded),
zap.Int("errored", errored),
zap.Int("timed_out", timedOut),
zap.Error(err.err),
).Error("error received")
case <-ctx.Done():
// Anyone not responded by now is considered errored.
timedOut = requests - responded - errored
gc.log.
With(
zap.Duration("elapsed", time.Since(started)),
zap.Int("responded", responded),
zap.Int("errored", errored),
zap.Int("timed_out", timedOut),
).Error("hard timeout reached")
}
}
if resp.Data == nil {
gc.log.Error(clNilResponseDataErrMsg,
zap.String("api", "AttestationData"),
)
return nil, fmt.Errorf("attestation data is nil")
cancel()

gc.log.
With(
zap.Duration("elapsed", time.Since(started)),
zap.Int("responded", responded),
zap.Int("errored", errored),
zap.Int("timed_out", timedOut),
).Debug("results")

if bestAttestationData == nil {
return nil, errors.New("no attestations received")
}

gc.log.
With(
zap.String("best_client", bestClient),
zap.Stringer("attestation_data", bestAttestationData),
zap.Float64("score", bestScore),
).Debug("selected best attestation")

// Caching resulting value here (as part of inflight request) guarantees only 1 request
// will ever be done for a given slot.
gc.attestationDataCache.Set(slot, resp.Data, ttlcache.DefaultTTL)
gc.attestationDataCache.Set(slot, bestAttestationData, ttlcache.DefaultTTL)

return resp.Data, nil
return bestAttestationData, nil
})
if err != nil {
return nil, DataVersionNil, err
Expand All @@ -101,9 +233,85 @@ func (gc *GoClient) GetAttestationData(slot phase0.Slot, committeeIndex phase0.C
if err != nil {
return nil, DataVersionNil, fmt.Errorf("failed to set committee index: %w", err)
}

return data, spec.DataVersionPhase0, nil
}

func (gc *GoClient) fetchAttestationData(ctx context.Context,
client Client,
respCh chan *attestationDataResponse,
errCh chan *attestationDataError,
opts *api.AttestationDataOpts,
) {
addr := client.Address()
attDataReqStart := time.Now()

response, err := client.AttestationData(ctx, opts)

recordRequestDuration(ctx, "AttestationData", addr, http.MethodGet, time.Since(attDataReqStart), err)
logger := gc.log.With(zap.String("api", "AttestationData"))

if err != nil {
gc.log.Error(clResponseErrMsg, zap.Error(err))
errCh <- &attestationDataError{
clientAddr: addr,
err: err,
}
return
}
if response == nil {
logger.Error(clNilResponseErrMsg)
errCh <- &attestationDataError{
clientAddr: addr,
err: errors.New("response is nil"),
}
return
}
attestationData := response.Data
if attestationData == nil {
logger.Error(clNilResponseDataErrMsg)
errCh <- &attestationDataError{
clientAddr: addr,
err: errors.New("attestation data nil"),
}
return
}

score := gc.scoreAttestationData(ctx, addr, attestationData)

respCh <- &attestationDataResponse{
clientAddr: addr,
attestationData: attestationData,
score: score,
}
}

// scoreAttestationData generates a score for attestation data.
// The score is relative to the reward expected from the contents of the attestation.
func (gc *GoClient) scoreAttestationData(ctx context.Context,
addr string,
attestationData *phase0.AttestationData,
) float64 {
if attestationData == nil {
return 0
}

// Initial score is based on height of source and target epochs.
score := float64(attestationData.Source.Epoch + attestationData.Target.Epoch)

//TODO: add slot values to equation. Implement cache

gc.log.
With(zap.String("client_addr", addr)).
With(zap.Uint64("attestation_slot", uint64(attestationData.Slot))).
With(zap.Uint64("source_epoch", uint64(attestationData.Source.Epoch))).
With(zap.Uint64("target_epoch", uint64(attestationData.Target.Epoch))).
With(zap.Float64("score", score)).
Info("scored attestation data")

return score
}

// withCommitteeIndex returns a deep copy of the attestation data with the provided committee index set.
func withCommitteeIndex(data *phase0.AttestationData, committeeIndex phase0.CommitteeIndex) (*phase0.AttestationData, error) {
// Marshal & unmarshal to make a deep copy. This is safer because it won't break silently if
Expand Down
2 changes: 1 addition & 1 deletion beacon/goclient/goclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func ParseNodeClient(version string) NodeClient {
type Client interface {
MultiClient

eth2client.AttestationDataProvider
eth2client.NodeVersionProvider
eth2client.NodeClientProvider
eth2client.BlindedProposalSubmitter
Expand All @@ -83,7 +84,6 @@ type MultiClient interface {
eth2client.SpecProvider
eth2client.GenesisProvider

eth2client.AttestationDataProvider
eth2client.AttestationsSubmitter
eth2client.AggregateAttestationProvider
eth2client.AggregateAttestationsSubmitter
Expand Down

0 comments on commit d780ebf

Please sign in to comment.