diff --git a/beacon/goclient/attest.go b/beacon/goclient/attest.go index a9bf2066cd..9f6971b253 100644 --- a/beacon/goclient/attest.go +++ b/beacon/goclient/attest.go @@ -6,6 +6,8 @@ import ( "net/http" "time" + "errors" + "github.com/attestantio/go-eth2-client/api" eth2apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec" @@ -14,6 +16,19 @@ import ( "go.uber.org/zap" ) +type ( + attestationDataResponse struct { + clientAddr string + attestationData *phase0.AttestationData + score float64 + } + + attestationDataError struct { + clientAddr string + err error + } +) + // AttesterDuties returns attester duties for a given epoch. func (gc *GoClient) AttesterDuties(ctx context.Context, epoch phase0.Epoch, validatorIndices []phase0.ValidatorIndex) ([]*eth2apiv1.AttesterDuty, error) { start := time.Now() @@ -60,38 +75,155 @@ func (gc *GoClient) GetAttestationData(slot phase0.Slot, committeeIndex phase0.C // Have to make beacon node request and cache the result. result, err, _ := gc.attestationReqInflight.Do(slot, func() (*phase0.AttestationData, error) { - attDataReqStart := time.Now() - resp, err := gc.multiClient.AttestationData(gc.ctx, &api.AttestationDataOpts{ - Slot: slot, - }) + // We have two timeouts: a soft timeout and a hard timeout. + // At the soft timeout, we return if we have any responses so far. + // At the hard timeout, we return unconditionally. + // The soft timeout is half the duration of the hard timeout. + const timeout = time.Second * 2 + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + softCtx, softCancel := context.WithTimeout(ctx, timeout/2) + started := time.Now() - recordRequestDuration(gc.ctx, "AttestationData", gc.multiClient.Address(), http.MethodGet, time.Since(attDataReqStart), err) + requests := len(gc.clients) + respCh := make(chan *attestationDataResponse, requests) + errCh := make(chan *attestationDataError, requests) - if err != nil { - gc.log.Error(clResponseErrMsg, - zap.String("api", "AttestationData"), - zap.Error(err), - ) - return nil, fmt.Errorf("failed to get attestation data: %w", err) + for _, client := range gc.clients { + go gc.fetchAttestationData(ctx, client, respCh, errCh, &api.AttestationDataOpts{Slot: slot}) + } + + // Wait for all responses (or context done). + var ( + responded, + errored, + timedOut, + softTimedOut int + bestScore float64 + bestAttestationData *phase0.AttestationData + bestClient string + ) + + // Loop 1: prior to soft timeout. + for responded+errored+timedOut+softTimedOut != requests { + select { + case resp := <-respCh: + responded++ + gc.log. + With( + zap.Duration("elapsed", time.Since(started)), + zap.String("client_addr", resp.clientAddr), + zap.Int("responded", responded), + zap.Int("errored", errored), + zap.Int("timed_out", timedOut), + ).Debug("response received") + + if bestAttestationData == nil || resp.score > bestScore { + bestAttestationData = resp.attestationData + bestScore = resp.score + bestClient = resp.clientAddr + } + case err := <-errCh: + errored++ + gc.log. + With( + zap.Duration("elapsed", time.Since(started)), + zap.String("client_addr", err.clientAddr), + zap.Int("responded", responded), + zap.Int("errored", errored), + zap.Int("timed_out", timedOut), + zap.Error(err.err), + ).Error("error received") + case <-softCtx.Done(): + // If we have any responses at this point we consider the non-responders timed out. + if responded > 0 { + timedOut = requests - responded - errored + gc.log. + With( + zap.Duration("elapsed", time.Since(started)), + zap.Int("responded", responded), + zap.Int("errored", errored), + zap.Int("timed_out", timedOut), + ).Debug("soft timeout reached with responses") + } else { + gc.log. + With( + zap.Duration("elapsed", time.Since(started)), + zap.Int("errored", errored), + ).Error("soft timeout reached with no responses") + } + // Set the number of requests that have soft timed out. + softTimedOut = requests - responded - errored - timedOut + } } - if resp == nil { - gc.log.Error(clNilResponseErrMsg, - zap.String("api", "AttestationData"), - ) - return nil, fmt.Errorf("attestation data response is nil") + softCancel() + + // Loop 2: after soft timeout. + for responded+errored+timedOut != requests { + select { + case resp := <-respCh: + responded++ + gc.log. + With( + zap.Duration("elapsed", time.Since(started)), + zap.String("client_addr", resp.clientAddr), + zap.Int("responded", responded), + zap.Int("errored", errored), + zap.Int("timed_out", timedOut), + ).Debug("response received") + if bestAttestationData == nil || resp.score > bestScore { + bestAttestationData = resp.attestationData + bestScore = resp.score + bestClient = resp.clientAddr + } + case err := <-errCh: + errored++ + gc.log. + With( + zap.Duration("elapsed", time.Since(started)), + zap.String("client_addr", err.clientAddr), + zap.Int("responded", responded), + zap.Int("errored", errored), + zap.Int("timed_out", timedOut), + zap.Error(err.err), + ).Error("error received") + case <-ctx.Done(): + // Anyone not responded by now is considered errored. + timedOut = requests - responded - errored + gc.log. + With( + zap.Duration("elapsed", time.Since(started)), + zap.Int("responded", responded), + zap.Int("errored", errored), + zap.Int("timed_out", timedOut), + ).Error("hard timeout reached") + } } - if resp.Data == nil { - gc.log.Error(clNilResponseDataErrMsg, - zap.String("api", "AttestationData"), - ) - return nil, fmt.Errorf("attestation data is nil") + cancel() + + gc.log. + With( + zap.Duration("elapsed", time.Since(started)), + zap.Int("responded", responded), + zap.Int("errored", errored), + zap.Int("timed_out", timedOut), + ).Debug("results") + + if bestAttestationData == nil { + return nil, errors.New("no attestations received") } + gc.log. + With( + zap.String("best_client", bestClient), + zap.Stringer("attestation_data", bestAttestationData), + zap.Float64("score", bestScore), + ).Debug("selected best attestation") + // Caching resulting value here (as part of inflight request) guarantees only 1 request // will ever be done for a given slot. - gc.attestationDataCache.Set(slot, resp.Data, ttlcache.DefaultTTL) + gc.attestationDataCache.Set(slot, bestAttestationData, ttlcache.DefaultTTL) - return resp.Data, nil + return bestAttestationData, nil }) if err != nil { return nil, DataVersionNil, err @@ -101,9 +233,85 @@ func (gc *GoClient) GetAttestationData(slot phase0.Slot, committeeIndex phase0.C if err != nil { return nil, DataVersionNil, fmt.Errorf("failed to set committee index: %w", err) } + return data, spec.DataVersionPhase0, nil } +func (gc *GoClient) fetchAttestationData(ctx context.Context, + client Client, + respCh chan *attestationDataResponse, + errCh chan *attestationDataError, + opts *api.AttestationDataOpts, +) { + addr := client.Address() + attDataReqStart := time.Now() + + response, err := client.AttestationData(ctx, opts) + + recordRequestDuration(ctx, "AttestationData", addr, http.MethodGet, time.Since(attDataReqStart), err) + logger := gc.log.With(zap.String("api", "AttestationData")) + + if err != nil { + gc.log.Error(clResponseErrMsg, zap.Error(err)) + errCh <- &attestationDataError{ + clientAddr: addr, + err: err, + } + return + } + if response == nil { + logger.Error(clNilResponseErrMsg) + errCh <- &attestationDataError{ + clientAddr: addr, + err: errors.New("response is nil"), + } + return + } + attestationData := response.Data + if attestationData == nil { + logger.Error(clNilResponseDataErrMsg) + errCh <- &attestationDataError{ + clientAddr: addr, + err: errors.New("attestation data nil"), + } + return + } + + score := gc.scoreAttestationData(ctx, addr, attestationData) + + respCh <- &attestationDataResponse{ + clientAddr: addr, + attestationData: attestationData, + score: score, + } +} + +// scoreAttestationData generates a score for attestation data. +// The score is relative to the reward expected from the contents of the attestation. +func (gc *GoClient) scoreAttestationData(ctx context.Context, + addr string, + attestationData *phase0.AttestationData, +) float64 { + if attestationData == nil { + return 0 + } + + // Initial score is based on height of source and target epochs. + score := float64(attestationData.Source.Epoch + attestationData.Target.Epoch) + + //TODO: add slot values to equation. Implement cache + + gc.log. + With(zap.String("client_addr", addr)). + With(zap.Uint64("attestation_slot", uint64(attestationData.Slot))). + With(zap.Uint64("source_epoch", uint64(attestationData.Source.Epoch))). + With(zap.Uint64("target_epoch", uint64(attestationData.Target.Epoch))). + With(zap.Float64("score", score)). + Info("scored attestation data") + + return score +} + // withCommitteeIndex returns a deep copy of the attestation data with the provided committee index set. func withCommitteeIndex(data *phase0.AttestationData, committeeIndex phase0.CommitteeIndex) (*phase0.AttestationData, error) { // Marshal & unmarshal to make a deep copy. This is safer because it won't break silently if diff --git a/beacon/goclient/goclient.go b/beacon/goclient/goclient.go index f336673829..544967a2ff 100644 --- a/beacon/goclient/goclient.go +++ b/beacon/goclient/goclient.go @@ -73,6 +73,7 @@ func ParseNodeClient(version string) NodeClient { type Client interface { MultiClient + eth2client.AttestationDataProvider eth2client.NodeVersionProvider eth2client.NodeClientProvider eth2client.BlindedProposalSubmitter @@ -83,7 +84,6 @@ type MultiClient interface { eth2client.SpecProvider eth2client.GenesisProvider - eth2client.AttestationDataProvider eth2client.AttestationsSubmitter eth2client.AggregateAttestationProvider eth2client.AggregateAttestationsSubmitter