Skip to content

Commit

Permalink
Merge pull request #2691 from gobitfly/NOBIDS/improve_notification_speed
Browse files Browse the repository at this point in the history
Nobids/improve notification speed
  • Loading branch information
peterbitfly authored Nov 14, 2023
2 parents 931be96 + 7635416 commit 4c3a400
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 52 deletions.
76 changes: 76 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3148,3 +3148,79 @@ func GetSyncParticipationBySlotRange(startSlot, endSlot uint64) (map[uint64]uint

return ret, nil
}

// Should be used when retrieving data for a very large amount of validators (for the notifications process)
func GetValidatorAttestationHistoryForNotifications(startEpoch uint64, endEpoch uint64) (map[types.Epoch]map[types.ValidatorIndex]bool, error) {
// first retrieve activation & exit epoch for all validators
activityData := []struct {
ValidatorIndex types.ValidatorIndex
ActivationEpoch types.Epoch
ExitEpoch types.Epoch
}{}

err := ReaderDb.Select(&activityData, "SELECT validatorindex, activationepoch, exitepoch FROM validators ORDER BY validatorindex;")
if err != nil {
return nil, fmt.Errorf("error retrieving activation & exit epoch for validators: %w", err)
}

logger.Info("retrieved activation & exit epochs")

// next retrieve all attestation data from the db (need to retrieve data for the endEpoch+1 epoch as that could still contain attestations for the endEpoch)
firstSlot := startEpoch * utils.Config.Chain.ClConfig.SlotsPerEpoch
lastSlot := ((endEpoch+1)*utils.Config.Chain.ClConfig.SlotsPerEpoch - 1)
lastQuerySlot := ((endEpoch+2)*utils.Config.Chain.ClConfig.SlotsPerEpoch - 1)

rows, err := ReaderDb.Query(`SELECT
blocks_attestations.slot,
validators
FROM blocks_attestations
LEFT JOIN blocks ON blocks_attestations.block_root = blocks.blockroot WHERE
blocks_attestations.block_slot >= $1 AND blocks_attestations.block_slot <= $2 AND blocks.status = '1' ORDER BY block_slot`, firstSlot, lastQuerySlot)
if err != nil {
return nil, fmt.Errorf("error retrieving attestation data from the db: %w", err)
}
defer rows.Close()

logger.Info("retrieved attestation raw data")

// next process the data and fill up the epoch participation
// validators that participated in an epoch will have the flag set to true
// validators that missed their participation will have it set to false
epochParticipation := make(map[types.Epoch]map[types.ValidatorIndex]bool)
for rows.Next() {
var slot types.Slot
var attestingValidators pq.Int64Array

err := rows.Scan(&slot, &attestingValidators)
if err != nil {
return nil, fmt.Errorf("error scanning attestation data: %w", err)
}

if slot < types.Slot(firstSlot) || slot > types.Slot(lastSlot) {
continue
}

epoch := types.Epoch(utils.EpochOfSlot(uint64(slot)))

participation := epochParticipation[epoch]

if participation == nil {
epochParticipation[epoch] = make(map[types.ValidatorIndex]bool)

// logger.Infof("seeding validator duties for epoch %v", epoch)
for _, data := range activityData {
if data.ActivationEpoch <= epoch && epoch < data.ExitEpoch {
epochParticipation[epoch][types.ValidatorIndex(data.ValidatorIndex)] = false
}
}

participation = epochParticipation[epoch]
}

for _, validator := range attestingValidators {
participation[types.ValidatorIndex(validator)] = true
}
}

return epochParticipation, nil
}
85 changes: 34 additions & 51 deletions services/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,7 @@ func collectBlockProposalNotifications(notificationsByUserID map[uint64]map[type
}

for _, event := range events {
pubkey, err := GetGetPubkeyForIndex(event.Proposer)
pubkey, err := GetPubkeyForIndex(event.Proposer)
if err != nil {
utils.LogError(err, "error retrieving pubkey for validator", 0, map[string]interface{}{"validator": event.Proposer})
continue
Expand Down Expand Up @@ -1358,8 +1358,6 @@ func collectAttestationAndOfflineValidatorNotifications(notificationsByUserID ma
ValidatorIndex uint64 `db:"validatorindex"`
Epoch uint64 `db:"epoch"`
Status uint64 `db:"status"`
Slot uint64 `db:"attesterslot"`
InclusionSlot uint64 `db:"inclusionslot"`
EventFilter []byte `db:"pubkey"`
}

Expand All @@ -1370,50 +1368,40 @@ func collectAttestationAndOfflineValidatorNotifications(notificationsByUserID ma
return err
}

attestations, err := db.BigtableClient.GetValidatorAttestationHistory(validators, epoch-3, epoch)
participationPerEpoch, err := db.GetValidatorAttestationHistoryForNotifications(epoch-3, epoch)
if err != nil {
return fmt.Errorf("error getting validator attestations from bigtable %w", err)
return fmt.Errorf("error getting validator attestations from db %w", err)
}

logger.Infof("retrieved validator attestation history data")

events := make([]dbResult, 0)

epochAttested := make(map[uint64]uint64)
epochTotal := make(map[uint64]uint64)
participationPerEpoch := make(map[uint64]map[uint64]int) // map[validatorindex]map[epoch]attested
for validator, history := range attestations {
for _, attestation := range history {
if participationPerEpoch[validator] == nil {
participationPerEpoch[validator] = make(map[uint64]int, 4)
}
epochTotal[attestation.Epoch] = epochTotal[attestation.Epoch] + 1 // count the total attestations for each epoch

if attestation.Status == 0 {
epochAttested := make(map[types.Epoch]uint64)
epochTotal := make(map[types.Epoch]uint64)
for currentEpoch, participation := range participationPerEpoch {
for validatorIndex, participated := range participation {

participationPerEpoch[validator][attestation.Epoch] = 1 // missed
epochTotal[currentEpoch] = epochTotal[currentEpoch] + 1 // count the total attestations for each epoch

pubkey, err := GetGetPubkeyForIndex(validator)
if !participated {
pubkey, err := GetPubkeyForIndex(uint64(validatorIndex))
if err == nil {
if attestation.Epoch != epoch || subMap[hex.EncodeToString(pubkey)] == nil {
if currentEpoch != types.Epoch(epoch) || subMap[hex.EncodeToString(pubkey)] == nil {
continue
}

events = append(events, dbResult{
ValidatorIndex: validator,
Epoch: attestation.Epoch,
Status: attestation.Status,
Slot: attestation.AttesterSlot,
InclusionSlot: attestation.InclusionSlot,
ValidatorIndex: uint64(validatorIndex),
Epoch: uint64(currentEpoch),
Status: 0,
EventFilter: pubkey,
})
} else {
logger.Errorf("error retrieving pubkey for validator %v: %v", validator, err)
logger.Errorf("error retrieving pubkey for validator %v: %v", validatorIndex, err)
}
} else {
participationPerEpoch[validator][attestation.Epoch] = 2 // attested

epochAttested[attestation.Epoch] = epochAttested[attestation.Epoch] + 1 // count the total attested attestation for each epoch (exlude missing)
epochAttested[currentEpoch] = epochAttested[currentEpoch] + 1 // count the total attested attestation for each epoch (exlude missing)
}
}
}
Expand Down Expand Up @@ -1443,8 +1431,6 @@ func collectAttestationAndOfflineValidatorNotifications(notificationsByUserID ma
Epoch: event.Epoch,
Status: event.Status,
EventName: types.ValidatorMissedAttestationEventName,
Slot: event.Slot,
InclusionSlot: event.InclusionSlot,
EventFilter: hex.EncodeToString(event.EventFilter),
}
if _, exists := notificationsByUserID[*sub.UserID]; !exists {
Expand Down Expand Up @@ -1475,11 +1461,11 @@ func collectAttestationAndOfflineValidatorNotifications(notificationsByUserID ma
var offlineValidators []*indexPubkeyPair
var onlineValidators []*indexPubkeyPair

epochNMinus1 := epoch - 1
epochNMinus2 := epoch - 2
epochNMinus3 := epoch - 3
epochNMinus1 := types.Epoch(epoch - 1)
epochNMinus2 := types.Epoch(epoch - 2)
epochNMinus3 := types.Epoch(epoch - 3)

if epochTotal[epoch] == 0 {
if epochTotal[types.Epoch(epoch)] == 0 {
return fmt.Errorf("consistency error, did not retrieve attestation data for epoch %v", epoch)
}
if epochTotal[epochNMinus1] == 0 {
Expand All @@ -1492,8 +1478,8 @@ func collectAttestationAndOfflineValidatorNotifications(notificationsByUserID ma
return fmt.Errorf("consistency error, did not retrieve attestation data for epoch %v", epochNMinus3)
}

if epochAttested[epoch]*100/epochTotal[epoch] < 60 {
return fmt.Errorf("consistency error, did receive more than 60%% of missed attestation in epoch %v (total: %v, attested: %v)", epoch, epochTotal[epoch], epochAttested[epoch])
if epochAttested[types.Epoch(epoch)]*100/epochTotal[types.Epoch(epoch)] < 60 {
return fmt.Errorf("consistency error, did receive more than 60%% of missed attestation in epoch %v (total: %v, attested: %v)", epoch, epochTotal[types.Epoch(epoch)], epochAttested[types.Epoch(epoch)])
}
if epochAttested[epochNMinus1]*100/epochTotal[epochNMinus1] < 60 {
return fmt.Errorf("consistency error, did receive more than 60%% of missed attestation in epoch %v (total: %v, attested: %v)", epochNMinus1, epochTotal[epochNMinus1], epochAttested[epochNMinus1])
Expand All @@ -1505,24 +1491,25 @@ func collectAttestationAndOfflineValidatorNotifications(notificationsByUserID ma
return fmt.Errorf("consistency error, did receive more than 60%% of missed attestation in epoch %v (total: %v, attested: %v)", epochNMinus3, epochTotal[epochNMinus3], epochAttested[epochNMinus3])
}

for validator, participation := range participationPerEpoch {
if participation[epochNMinus3] == 2 && participation[epochNMinus2] == 1 && participation[epochNMinus1] == 1 && participation[epoch] == 1 {
for _, validator := range validators {
if participationPerEpoch[epochNMinus3][types.ValidatorIndex(validator)] && !participationPerEpoch[epochNMinus2][types.ValidatorIndex(validator)] && !participationPerEpoch[epochNMinus1][types.ValidatorIndex(validator)] && !participationPerEpoch[types.Epoch(epoch)][types.ValidatorIndex(validator)] {
logger.Infof("validator %v detected as offline in epoch %v (did not attest since epoch %v)", validator, epoch, epochNMinus2)
pubkey, err := GetGetPubkeyForIndex(validator)
pubkey, err := GetPubkeyForIndex(validator)
if err != nil {
return err
}
offlineValidators = append(offlineValidators, &indexPubkeyPair{Index: validator, Pubkey: pubkey})
}

if participation[epochNMinus3] == 1 && participation[epochNMinus2] == 1 && participation[epochNMinus1] == 1 && participation[epoch] == 2 {
if !participationPerEpoch[epochNMinus3][types.ValidatorIndex(validator)] && !participationPerEpoch[epochNMinus2][types.ValidatorIndex(validator)] && !participationPerEpoch[epochNMinus1][types.ValidatorIndex(validator)] && participationPerEpoch[types.Epoch(epoch)][types.ValidatorIndex(validator)] {
logger.Infof("validator %v detected as online in epoch %v (attested again in epoch %v)", validator, epoch, epoch)
pubkey, err := GetGetPubkeyForIndex(validator)
pubkey, err := GetPubkeyForIndex(validator)
if err != nil {
return err
}
onlineValidators = append(onlineValidators, &indexPubkeyPair{Index: validator, Pubkey: pubkey})
}

}

offlineValidatorsLimit := 5000
Expand Down Expand Up @@ -1733,8 +1720,6 @@ type validatorAttestationNotification struct {
Epoch uint64
Status uint64 // * Can be 0 = scheduled | missed, 1 executed
EventName types.EventName
Slot uint64
InclusionSlot uint64
EventFilter string
UnsubscribeHash sql.NullString
}
Expand All @@ -1760,19 +1745,17 @@ func (n *validatorAttestationNotification) GetInfo(includeUrl bool) string {
if includeUrl {
switch n.Status {
case 0:
generalPart = fmt.Sprintf(`Validator <a href="https://%[3]v/validator/%[1]v">%[1]v</a> missed an attestation at slot <a href="https://%[3]v/slot/%[2]v">%[2]v</a>.`, n.ValidatorIndex, n.Slot, utils.Config.Frontend.SiteDomain)
//generalPart = fmt.Sprintf(`New scheduled attestation for Validator %v at slot %v.`, n.ValidatorIndex, n.Slot)
generalPart = fmt.Sprintf(`Validator <a href="https://%[3]v/validator/%[1]v">%[1]v</a> missed an attestation in epoch <a href="https://%[3]v/epoch/%[2]v">%[2]v</a>.`, n.ValidatorIndex, n.Epoch, utils.Config.Frontend.SiteDomain)
case 1:
generalPart = fmt.Sprintf(`Validator <a href="https://%[3]v/validator/%[1]v">%[1]v</a> submitted a successful attestation for slot <a href="https://%[3]v/slot/%[2]v">%[2]v</a>.`, n.ValidatorIndex, n.Slot, utils.Config.Frontend.SiteDomain)
generalPart = fmt.Sprintf(`Validator <a href="https://%[3]v/validator/%[1]v">%[1]v</a> submitted a successful attestation for epoch <a href="https://%[3]v/epoch/%[2]v">%[2]v</a>.`, n.ValidatorIndex, n.Epoch, utils.Config.Frontend.SiteDomain)
}
// return generalPart + getUrlPart(n.ValidatorIndex)
} else {
switch n.Status {
case 0:
generalPart = fmt.Sprintf(`Validator %v missed an attestation at slot %v.`, n.ValidatorIndex, n.Slot)
//generalPart = fmt.Sprintf(`New scheduled attestation for Validator %v at slot %v.`, n.ValidatorIndex, n.Slot)
generalPart = fmt.Sprintf(`Validator %v missed an attestation in epoch %v.`, n.ValidatorIndex, n.Epoch)
case 1:
generalPart = fmt.Sprintf(`Validator %v submitted a successful attestation for slot %v.`, n.ValidatorIndex, n.Slot)
generalPart = fmt.Sprintf(`Validator %v submitted a successful attestation in epoch %v.`, n.ValidatorIndex, n.Epoch)
}
}
return generalPart
Expand Down Expand Up @@ -1807,9 +1790,9 @@ func (n *validatorAttestationNotification) GetInfoMarkdown() string {
var generalPart = ""
switch n.Status {
case 0:
generalPart = fmt.Sprintf(`Validator [%[1]v](https://%[3]v/validator/%[1]v) missed an attestation at slot [%[2]v](https://%[3]v/slot/%[2]v).`, n.ValidatorIndex, n.Slot, utils.Config.Frontend.SiteDomain)
generalPart = fmt.Sprintf(`Validator [%[1]v](https://%[3]v/validator/%[1]v) missed an attestation in epoch [%[2]v](https://%[3]v/epoch/%[2]v).`, n.ValidatorIndex, n.Epoch, utils.Config.Frontend.SiteDomain)
case 1:
generalPart = fmt.Sprintf(`Validator [%[1]v](https://%[3]v/validator/%[1]v) submitted a successful attestation for slot [%[2]v](https://%[3]v/slot/%[2]v).`, n.ValidatorIndex, n.Slot, utils.Config.Frontend.SiteDomain)
generalPart = fmt.Sprintf(`Validator [%[1]v](https://%[3]v/validator/%[1]v) submitted a successful attestation in epoch [%[2]v](https://%[3]v/epoch/%[2]v).`, n.ValidatorIndex, n.Epoch, utils.Config.Frontend.SiteDomain)
}
return generalPart
}
Expand Down
2 changes: 1 addition & 1 deletion services/pubkeyCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func initPubkeyCache(path string) error {
}

// will retrieve the pubkey for a given validatorindex and store it for later use
func GetGetPubkeyForIndex(index uint64) ([]byte, error) {
func GetPubkeyForIndex(index uint64) ([]byte, error) {
key := []byte(fmt.Sprintf("%d", index))

pubkey, err := pubkeyCacheDb.Get(key, nil)
Expand Down

0 comments on commit 4c3a400

Please sign in to comment.