Skip to content

Commit

Permalink
chore(monitor): add inflation monitoring and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
corverroos committed Feb 12, 2025
1 parent 405a17e commit 3ed4c0b
Show file tree
Hide file tree
Showing 10 changed files with 333 additions and 5 deletions.
41 changes: 41 additions & 0 deletions e2e/test/rewards_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package e2e_test

import (
"context"
"testing"

"github.com/omni-network/omni/lib/cchain/provider"
"github.com/omni-network/omni/lib/cchain/queryutil"
"github.com/omni-network/omni/lib/netconf"
"github.com/omni-network/omni/lib/xchain"

"github.com/cometbft/cometbft/rpc/client/http"

"cosmossdk.io/math"
"github.com/stretchr/testify/require"
)

func TestInflation(t *testing.T) {
t.Parallel()
testNetwork(t, func(ctx context.Context, t *testing.T, network netconf.Network, endpoints xchain.RPCEndpoints) {
t.Helper()

cl, err := http.New(network.ID.Static().ConsensusRPC(), "/websocket")
require.NoError(t, err)
cprov := provider.NewABCI(cl, network.ID)

inf, changed, err := queryutil.AvgInflationRate(ctx, cprov, 3)
if changed {
t.Log("staking state changed") // Avoids test flapping given delegation race
return
}
require.NoError(t, err)

target := math.LegacyNewDecWithPrec(11, 2) // 11%
delta := math.LegacyNewDecWithPrec(1, 2) // Allow +-1% error
minInf, maxInf := target.Sub(delta), target.Add(delta)
if inf.LT(minInf) || inf.GT(maxInf) {
require.Fail(t, "inflation average not within bounds", "rate: %v, min: %v, max: %v", inf, minInf, maxInf)
}
})
}
22 changes: 22 additions & 0 deletions halo/app/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
magellan2 "github.com/omni-network/omni/halo/app/upgrades/magellan"
halocmd "github.com/omni-network/omni/halo/cmd"
halocfg "github.com/omni-network/omni/halo/config"
"github.com/omni-network/omni/lib/cchain"
cprovider "github.com/omni-network/omni/lib/cchain/provider"
"github.com/omni-network/omni/lib/cchain/queryutil"
"github.com/omni-network/omni/lib/ethclient"
"github.com/omni-network/omni/lib/log"
"github.com/omni-network/omni/lib/netconf"
Expand All @@ -28,6 +30,7 @@ import (
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
"github.com/cometbft/cometbft/types"

"cosmossdk.io/math"
db "github.com/cosmos/cosmos-db"
minttypes "github.com/cosmos/cosmos-sdk/x/mint/types"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -76,6 +79,7 @@ func TestSmoke(t *testing.T) {
testAPI(t, cfg)
testCProvider(t, ctx, cprov)
testCProvider(t, ctx, cprovGRPC)
go testInflation(t, ctx, cprov) //nolint:testifylint // Fix assertions in thread

genSet, err := cl.Validators(ctx, int64Ptr(1), nil, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -116,6 +120,24 @@ func TestSmoke(t *testing.T) {
require.NoError(t, stopfunc(context.Background()))
}

func testInflation(t *testing.T, ctx context.Context, cprov cchain.Provider) {
t.Helper()

inf, changed, err := queryutil.AvgInflationRate(ctx, cprov, 3)
if changed {
t.Log("staking state changed")
return
}
require.NoError(t, err)

target := math.LegacyNewDecWithPrec(11, 2) // 11%
delta := math.LegacyNewDecWithPrec(1, 2) // Allow +-1% error
minInf, maxInf := target.Sub(delta), target.Add(delta)
if inf.LT(minInf) || inf.GT(maxInf) {
require.Fail(t, "inflation average not within bounds", "rate: %v, min: %v, max: %v", inf, minInf, maxInf)
}
}

//nolint:bodyclose,noctx // We don't care about best practices here.
func testAPI(t *testing.T, cfg haloapp.Config) {
t.Helper()
Expand Down
3 changes: 3 additions & 0 deletions lib/cchain/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type Provider interface {
// AppliedPlan returns the applied (activated) upgrade plan by name.
AppliedPlan(ctx context.Context, name string) (utypes.Plan, bool, error)

// BlockHeight returns the current consensus block height.
BlockHeight(ctx context.Context) (uint64, error)

// QueryClients returns the query clients for the various modules.
QueryClients() QueryClients
}
13 changes: 8 additions & 5 deletions lib/cchain/provider/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
errorsmod "cosmossdk.io/errors"
utypes "cosmossdk.io/x/upgrade/types"
"github.com/cosmos/cosmos-sdk/client/grpc/cmtservice"
"github.com/cosmos/cosmos-sdk/client/grpc/node"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -103,6 +104,7 @@ func newProvider(cc gogogrpc.ClientConn, network netconf.ID, opts ...func(*Provi
mcl := mtypes.NewQueryClient(cc)
evmengcl := evmengtypes.NewQueryClient(cc)
bcl := btypes.NewQueryClient(cc)
ncl := node.NewServiceClient(cc)

p := Provider{
fetch: newABCIFetchFunc(acl, cmtcl, namer),
Expand Down Expand Up @@ -135,6 +137,7 @@ func newProvider(cc gogogrpc.ClientConn, network netconf.ID, opts ...func(*Provi
Mint: mcl,
EvmEngine: evmengcl,
Bank: bcl,
Node: ncl,
},
}

Expand Down Expand Up @@ -556,12 +559,12 @@ type rpcAdaptor struct {
abci rpcclient.ABCIClient
}

// withCtxHeight returns a copy of the context with the `x-cosmos-block-height` grpc metadata header
// WithCtxHeight returns a copy of the context with the `x-cosmos-block-height` grpc metadata header
// set to the provided height.
//
// This height will be supplied in ABCIQueryOptions when issuing queries in the rpcAdaptor.
// It will also be added to grpc queries automatically.
func withCtxHeight(ctx context.Context, height uint64) (context.Context, error) {
func WithCtxHeight(ctx context.Context, height uint64) (context.Context, error) {
if _, ok, err := heightFromCtx(ctx); err != nil {
return nil, err
} else if ok {
Expand Down Expand Up @@ -660,7 +663,7 @@ func getEarliestStoreHeight(ctx context.Context, cl atypes.QueryClient, chainVer
// queryEarliestAttestation returns the earliest approved attestation for the provided chain version
// at the provided consensus block height, or the latest block height if height is 0.
func queryEarliestAttestation(ctx context.Context, cl atypes.QueryClient, chainVer xchain.ChainVersion, height uint64) (xchain.Attestation, bool, error) {
ctx, err := withCtxHeight(ctx, height)
ctx, err := WithCtxHeight(ctx, height)
if err != nil {
return xchain.Attestation{}, false, err
}
Expand All @@ -687,7 +690,7 @@ func queryEarliestAttestation(ctx context.Context, cl atypes.QueryClient, chainV
// queryLatestAttestation returns the latest approved attestation for the provided chain version
// at the provided consensus block height, or the latest block height if height is 0.
func queryLatestAttestation(ctx context.Context, cl atypes.QueryClient, chainVer xchain.ChainVersion, height uint64) (xchain.Attestation, bool, error) {
ctx, err := withCtxHeight(ctx, height)
ctx, err := WithCtxHeight(ctx, height)
if err != nil {
return xchain.Attestation{}, false, err
}
Expand All @@ -714,7 +717,7 @@ func queryLatestAttestation(ctx context.Context, cl atypes.QueryClient, chainVer
// attsFromAtHeight returns approved attestations for the provided chain version
// at the provided consensus block height, or the latest block height if height is 0.
func attsFromAtHeight(ctx context.Context, cl atypes.QueryClient, chainVer xchain.ChainVersion, fromOffset, height uint64) ([]xchain.Attestation, bool, error) {
ctx, err := withCtxHeight(ctx, height)
ctx, err := WithCtxHeight(ctx, height)
if err != nil {
return nil, false, err
}
Expand Down
11 changes: 11 additions & 0 deletions lib/cchain/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ethereum/go-ethereum/common"

upgradetypes "cosmossdk.io/x/upgrade/types"
"github.com/cosmos/cosmos-sdk/client/grpc/node"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

Expand Down Expand Up @@ -105,6 +106,16 @@ func (p Provider) AppliedPlan(ctx context.Context, name string) (upgradetypes.Pl
return p.appliedFunc(ctx, name)
}

// BlockHeight returns the current consensus block height.
func (p Provider) BlockHeight(ctx context.Context) (uint64, error) {
status, err := p.QueryClients().Node.Status(ctx, &node.StatusRequest{})
if err != nil {
return 0, errors.Wrap(err, "node status query")
}

return status.Height, nil
}

func (p Provider) AttestationsFrom(
ctx context.Context,
chainVer xchain.ChainVersion,
Expand Down
192 changes: 192 additions & 0 deletions lib/cchain/queryutil/rewards.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package queryutil

import (
"context"
"time"

"github.com/omni-network/omni/lib/cchain"
"github.com/omni-network/omni/lib/cchain/provider"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/forkjoin"

"cosmossdk.io/math"
sdk "github.com/cosmos/cosmos-sdk/types"
distrtypes "github.com/cosmos/cosmos-sdk/x/distribution/types"
stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"
)

var blocksPerYear = math.LegacyNewDec(365 * 24 * 60 * 60 / 2) // Matches upgrades/magellan::blocksPerYear

// AvgInflationRate returns the average inflation for all delegations over the given number of blocks
// or true if all delegations changed (couldn't calculate inflation).
func AvgInflationRate(ctx context.Context, cprov cchain.Provider, waitBlocks uint64) (math.LegacyDec, bool, error) {
delegators, err := allDelegators(ctx, cprov)
if err != nil {
return math.LegacyDec{}, false, errors.Wrap(err, "get delegators")
}

result, cancel := forkjoin.NewWithInputs(ctx, func(ctx context.Context, addr sdk.AccAddress) ([]math.LegacyDec, error) {
infl, changed, err := DelegatorInflationRates(ctx, cprov, addr, waitBlocks)
if changed {
return nil, nil
}

return infl, err
}, delegators, forkjoin.WithWorkers(4)) // Don't overload the API
defer cancel()

inflations, err := result.Flatten()
if err != nil {
return math.LegacyDec{}, false, errors.Wrap(err, "forkjoin")
}

sum, length := math.LegacyZeroDec(), math.LegacyZeroDec()
for _, infls := range inflations {
for _, infl := range infls {
sum = sum.Add(infl)
length = length.Add(math.LegacyOneDec())
}
}

if length.IsZero() {
return math.LegacyDec{}, true, errors.New("zero delegations")
}

return sum.Quo(length), false, nil
}

// DelegatorInflationRates returns the inflation rate per delegation for the given delegator over the given number of blocks,
// or true if the delegation changed (couldn't calculate inflation).
func DelegatorInflationRates(ctx context.Context, cprov cchain.Provider, delegator sdk.AccAddress, waitBlocks uint64) ([]math.LegacyDec, bool, error) {
rewards0, height0, err := getDelegationRewards(ctx, cprov, delegator)
if err != nil {
return nil, false, err
}

if err := waitUntil(ctx, cprov, height0+waitBlocks); err != nil {
return nil, false, err
}

rewards1, height1, err := getDelegationRewards(ctx, cprov, delegator)
if err != nil {
return nil, false, err
} else if len(rewards0) != len(rewards1) {
return nil, true, errors.New("delegations mismatch") // Staking actions occurred
}

blockDelta := math.LegacyNewDec(int64(height1) - int64(height0)) //nolint:gosec // No risk of overflow

var resp []math.LegacyDec
for i := range len(rewards0) {
rew0 := rewards0[i]
rew1 := rewards1[i]

if !rew0.Delegation.Balance.Equal(rew1.Delegation.Balance) {
return nil, true, errors.New("delegation balance mismatch")
}

rewardDelta := rew1.Rewards.Sub(rew0.Rewards)
rewardsPerYear := rewardDelta.Mul(blocksPerYear).Quo(blockDelta)
stake := rew0.Delegation.Balance.Amount.ToLegacyDec()
rewardsAPY := rewardsPerYear.Quo(stake)

resp = append(resp, rewardsAPY)
}

return resp, false, nil
}

func allDelegators(ctx context.Context, cprov cchain.Provider) ([]sdk.AccAddress, error) {
vals, err := cprov.SDKValidators(ctx)
if err != nil {
return nil, err
}

uniq := make(map[string]sdk.AccAddress)
for _, val := range vals {
resp, err := cprov.QueryClients().Staking.ValidatorDelegations(ctx, &stakingtypes.QueryValidatorDelegationsRequest{
ValidatorAddr: val.OperatorAddress,
})
if err != nil {
return nil, errors.Wrap(err, "query validator delegations")
}

for _, del := range resp.DelegationResponses {
addr, err := sdk.AccAddressFromBech32(del.Delegation.DelegatorAddress)
if err != nil {
return nil, errors.Wrap(err, "parse delegator address")
}

uniq[del.Delegation.DelegatorAddress] = addr
}
}

var resp []sdk.AccAddress
for _, addr := range uniq {
resp = append(resp, addr)
}

return resp, nil
}

func waitUntil(ctx context.Context, cprov cchain.Provider, target uint64) error {
for {
height, err := cprov.BlockHeight(ctx)
if err != nil {
return errors.Wrap(err, "get block")
}

if height >= target {
return nil
}

time.Sleep(time.Second)
}
}

// delegationReward contains a delegation and its distribution module accrued rewards.
type delegationReward struct {
Delegation stakingtypes.DelegationResponse
Rewards math.LegacyDec
}

// getDelegationRewards returns the current rewards-per-delegation (and height) for the given delegator.
func getDelegationRewards(ctx context.Context, cprov cchain.Provider, delegator sdk.AccAddress) ([]delegationReward, uint64, error) {
height, err := cprov.BlockHeight(ctx)
if err != nil {
return nil, 0, errors.Wrap(err, "get block")
}
ctx, err = provider.WithCtxHeight(ctx, height)
if err != nil {
return nil, 0, errors.Wrap(err, "set height")
}

resp, err := cprov.QueryClients().Staking.DelegatorDelegations(ctx, &stakingtypes.QueryDelegatorDelegationsRequest{
DelegatorAddr: delegator.String(),
})
if err != nil {
return nil, 0, errors.Wrap(err, "query delegator delegations")
} else if len(resp.DelegationResponses) == 0 {
return nil, 0, errors.New("no delegations")
}

var delegationRewards []delegationReward
for _, del := range resp.DelegationResponses {
rewardResp, err := cprov.QueryClients().Distribution.DelegationRewards(ctx, &distrtypes.QueryDelegationRewardsRequest{
DelegatorAddress: del.Delegation.DelegatorAddress,
ValidatorAddress: del.Delegation.ValidatorAddress,
})
if err != nil {
return nil, 0, errors.Wrap(err, "query delegation rewards")
} else if len(rewardResp.Rewards) != 1 {
return nil, 0, errors.New("no rewards")
}

delegationRewards = append(delegationRewards, delegationReward{
Delegation: del,
Rewards: rewardResp.Rewards[0].Amount,
})
}

return delegationRewards, height, nil
}
Loading

0 comments on commit 3ed4c0b

Please sign in to comment.