Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-spork client access support #230

Merged
merged 61 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
fc36a20
add cross-spork client type
devbugging May 6, 2024
0eaa3d4
add cross-spork methods
devbugging May 6, 2024
e47ad92
add support to config for providing the hosts values for previous sporks
devbugging May 6, 2024
b887c37
setup cross-spork client
devbugging May 6, 2024
cf5e6ed
change name
devbugging May 6, 2024
4ca5093
add logging
devbugging May 6, 2024
ca26af2
check correct client is used
devbugging May 6, 2024
b291868
add logger
devbugging May 6, 2024
21ced97
use test logger
devbugging May 6, 2024
b42c16a
test boundaries
devbugging May 6, 2024
a7ee4db
fix test
devbugging May 6, 2024
cbb71e4
add existing err test
devbugging May 6, 2024
f6fdfbc
update readme
devbugging May 6, 2024
aee635c
add example
devbugging May 6, 2024
8479fcb
add example
devbugging May 6, 2024
d068c59
improve docs
devbugging May 6, 2024
1874e82
add past spork method
devbugging May 7, 2024
156026d
change the client to base client with options
devbugging May 7, 2024
5d7a81e
define new block event type
devbugging May 7, 2024
44078d8
introduce backfilling ability
devbugging May 7, 2024
819a84d
add factories for block events
devbugging May 7, 2024
969e9b5
add logging
devbugging May 7, 2024
0e08f32
get latest height in the spork
devbugging May 7, 2024
ccfb9e4
add logger
devbugging May 7, 2024
8dba9c2
update ingestion engine with subscriber api change
devbugging May 7, 2024
b6a680a
improve error handling
devbugging May 7, 2024
b182d07
improve err handling
devbugging May 7, 2024
d55ac59
add log
devbugging May 7, 2024
1556554
update mocks
devbugging May 7, 2024
183496a
update mocks
devbugging May 7, 2024
c37a942
fix make file
devbugging May 7, 2024
375b36c
fix test api change
devbugging May 7, 2024
15ebaff
fix test api change
devbugging May 7, 2024
4ea7f9a
fix test api change
devbugging May 7, 2024
d207e5b
fix test api change
devbugging May 7, 2024
56d6d19
fix test api change
devbugging May 7, 2024
fad7b9e
fix spork check
devbugging May 7, 2024
9a3ce74
add mock
devbugging May 7, 2024
67aceef
mod tidy
devbugging May 7, 2024
873113f
change to getting a block header only
devbugging May 8, 2024
10094b2
add commments
devbugging May 8, 2024
ffac9c3
add while context error is nil
devbugging May 8, 2024
0949ca4
typo
devbugging May 8, 2024
45e49dc
update go-sdk to specific version
devbugging May 8, 2024
689797c
change to client interface
devbugging May 8, 2024
6e4dca5
change client not requiring height provided, also change client being…
devbugging May 8, 2024
02d85ad
add test for subscriber
devbugging May 8, 2024
db9f8d7
change type
devbugging May 8, 2024
de05844
implement typed mock
devbugging May 9, 2024
3f910aa
test order of blocks
devbugging May 9, 2024
03b4666
change to accept spork clients as part of the factory
devbugging May 15, 2024
752af31
move spork client to requester package
devbugging May 15, 2024
ac66503
remove test that overlap with subscription test
devbugging May 15, 2024
b133733
Merge branch 'main' into gregor/cross-spork
devbugging May 15, 2024
e2b9adb
update go sdk
devbugging May 16, 2024
3cadf70
cache spork boundaries
devbugging May 17, 2024
e600315
Merge branch 'main' into gregor/cross-spork
devbugging May 17, 2024
68d9dff
update flow-go v0.35.6-crescendo-preview.22-atree-inlining
devbugging May 17, 2024
f1a1df5
mod tidy
devbugging May 17, 2024
1269289
Merge branch 'main' into gregor/cross-spork
devbugging May 20, 2024
14d972f
replace for loop
devbugging May 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,26 @@ it should return:

The application can be configured using the following flags at runtime:

| Flag | Default Value | Description |
|---------------------------|------------------|------------------------------------------------------------------------------------------------------------------------|
| `--database-dir` | `./db` | Path to the directory for the database. |
| `--rpc-host` | `localhost` | Host for the JSON RPC API server. |
| `--rpc-port` | `8545` | Port for the JSON RPC API server. |
| `--access-node-grpc-host` | `localhost:3569` | Host to the Flow access node (AN) gRPC API. |
| `--evm-network-id` | `testnet` | EVM network ID (options: `testnet`, `mainnet`). |
| `--flow-network-id` | `emulator` | Flow network ID (options: `emulator`, `previewnet`). |
| `--coinbase` | (required) | Coinbase address to use for fee collection. |
| `--gas-price` | `1` | Static gas price used for EVM transactions. |
| `--coa-address` | (required) | Flow address that holds COA account used for submitting transactions. |
| `--coa-key` | (required) | *WARNING*: Do not use this flag in production! Private key value for the COA address used for submitting transactions. |
| `--coa-resource-create` | `false` | Auto-create the COA resource in the Flow COA account provided if one doesn't exist. |
| `--log-level` | `debug` | Define verbosity of the log output ('debug', 'info', 'error') |
| `--stream-limit` | 10 | Rate-limits the events sent to the client within one second |
| `--stream-timeout` | 3sec | Defines the timeout in seconds the server waits for the event to be sent to the client |

| Flag | Default Value | Description |
|-----------------------------|------------------|------------------------------------------------------------------------------------------------------------------------|
| `--database-dir` | `./db` | Path to the directory for the database. |
| `--rpc-host` | `localhost` | Host for the JSON RPC API server. |
| `--rpc-port` | `8545` | Port for the JSON RPC API server. |
| `--access-node-grpc-host` | `localhost:3569` | Host to the Flow access node (AN) gRPC API. |
| `--access-node-spork-hosts` | | Previous spork AN hosts, defined following the schema: {latest height}@{host} as comma separated list |
| `--evm-network-id` | `testnet` | EVM network ID (options: `testnet`, `mainnet`). |
| `--flow-network-id` | `emulator` | Flow network ID (options: `emulator`, `previewnet`). |
| `--coinbase` | (required) | Coinbase address to use for fee collection. |
| `--init-cadence-height` | 0 | Define the Cadence block height at which to start the indexing. |
| `--gas-price` | `1` | Static gas price used for EVM transactions. |
| `--coa-address` | (required) | Flow address that holds COA account used for submitting transactions. |
| `--coa-key` | (required) | *WARNING*: Do not use this flag in production! Private key value for the COA address used for submitting transactions. |
| `--coa-key-file` | | File path that contains JSON array of COA keys used in key-rotation mechanism, this is exclusive with `coa-key` flag. |
| `--coa-resource-create` | `false` | Auto-create the COA resource in the Flow COA account provided if one doesn't exist. |
| `--log-level` | `debug` | Define verbosity of the log output ('debug', 'info', 'error') |
| `--stream-limit` | 10 | Rate-limits the events sent to the client within one second |
| `--stream-timeout` | 3sec | Defines the timeout in seconds the server waits for the event to be sent to the client |
| `--filter-expiry` | `5m` | Filter defines the time it takes for an idle filter to expire |

## Getting Started

Expand Down
11 changes: 9 additions & 2 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,18 @@ func startIngestion(
) error {
logger.Info().Msg("starting up event ingestion")

client, err := grpc.NewClient(cfg.AccessNodeGRPCHost)
client, err := models.NewCrossSporkClient(cfg.AccessNodeHost, logger)
if err != nil {
return err
}

// if we provided access node previous spork hosts add them to the client
for height, host := range cfg.AccessNodePreviousSporkHosts {
if err := client.AddSpork(height, host); err != nil {
return fmt.Errorf("failed to add previous spork host to the client: %w", err)
}
}

blk, err := client.GetLatestBlock(context.Background(), false)
if err != nil {
return fmt.Errorf("failed to get latest cadence block: %w", err)
Expand Down Expand Up @@ -172,7 +179,7 @@ func startServer(

srv := api.NewHTTPServer(l, rpc.DefaultHTTPTimeouts)

client, err := grpc.NewClient(cfg.AccessNodeGRPCHost)
client, err := grpc.NewClient(cfg.AccessNodeHost)
if err != nil {
return err
}
Expand Down
32 changes: 27 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"io"
"math/big"
"os"
"strconv"
"strings"
"time"

"github.com/goccy/go-json"
Expand All @@ -29,8 +31,11 @@ const LiveNetworkInitCadenceHeght = uint64(1)
type Config struct {
// DatabaseDir is where the database should be stored.
DatabaseDir string
// AccessNodeGRPCHost defines the Flow network AN host.
AccessNodeGRPCHost string
// AccessNodeHost defines the current spork Flow network AN host.
AccessNodeHost string
// AccessNodePreviousSporkHosts contains a map of latest heights for each spork,
// which can be accessed via the host of the AN provided
AccessNodePreviousSporkHosts map[uint64]string
// GRPCPort for the RPC API server
RPCPort int
// GRPCHost for the RPC API server
Expand Down Expand Up @@ -68,16 +73,19 @@ type Config struct {
}

func FromFlags() (*Config, error) {
cfg := &Config{}
var evmNetwork, coinbase, gas, coa, key, keysPath, flowNetwork, logLevel, filterExpiry string
cfg := &Config{
AccessNodePreviousSporkHosts: make(map[uint64]string),
}
var evmNetwork, coinbase, gas, coa, key, keysPath, flowNetwork, logLevel, filterExpiry, accessSporkHosts string
var streamTimeout int
var initHeight uint64

// parse from flags
flag.StringVar(&cfg.DatabaseDir, "database-dir", "./db", "Path to the directory for the database")
flag.StringVar(&cfg.RPCHost, "rpc-host", "", "Host for the RPC API server")
flag.IntVar(&cfg.RPCPort, "rpc-port", 8545, "Port for the RPC API server")
flag.StringVar(&cfg.AccessNodeGRPCHost, "access-node-grpc-host", "localhost:3569", "Host to the flow access node gRPC API")
flag.StringVar(&cfg.AccessNodeHost, "access-node-grpc-host", "localhost:3569", "Host to the flow access node gRPC API")
flag.StringVar(&accessSporkHosts, "access-node-spork-hosts", "", `Previous spork AN hosts, defined following the schema: {latest height}@{host} as comma separated list (e.g. "[email protected],[email protected]")`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does latest height mean last height? is it sealed or finalized? It may make sense to define this as the first height in the spork since that's available in the sporks.json file as well as via the API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make it that way yeah

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually on the second thought, I'm not sure it makes sense to do it that way, since then I cannot know if a height is in the current spork or previous, let me show with an example, if I provide one previous spork that starts at height 100, and I want to check for height 150, I can not know whether thats from previous spork or current spork, this would then require for current spork client to also be defined with starting height, which I wanted to to avoid. We could just subtract 1 from whats in sporks.json no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like there are 2 modes for the clients:

  • current
  • historic

In both cases, you can lookup the first block available using GetNodeVersionInfo().
In the historic case, you can lookup the last block available using GetLatestBlockHeader(). In some cases (infra bugs) this does not match [next spork start height] - 1 because the AN is missing some blocks. We always backfill when this happens, but something to be aware of.

When I originally wrote this comment, I was thinking the first block would be easier for the operator to get, but I think you're right that they can just derive it from to next spork. Another benefit of using the last height is you could validate that the height matches the response from GetLatestBlockHeader()

flag.StringVar(&evmNetwork, "evm-network-id", "previewnet", "EVM network ID (previewnet, testnet, mainnet)")
flag.StringVar(&flowNetwork, "flow-network-id", "flow-emulator", "Flow network ID (flow-emulator, flow-previewnet)")
flag.StringVar(&coinbase, "coinbase", "", "Coinbase address to use for fee collection")
Expand Down Expand Up @@ -181,6 +189,20 @@ func FromFlags() (*Config, error) {
}
cfg.FilterExpiry = exp

if accessSporkHosts != "" {
heightHosts := strings.Split(accessSporkHosts, ",")
for _, hh := range heightHosts {
v := strings.Split(hh, "@")
heightVal, host := v[0], v[1]
height, err := strconv.Atoi(heightVal)
if err != nil {
return nil, fmt.Errorf("failed to parse AN host height value for previous sporks, provided with --access-node-spork-hosts flag")
}

cfg.AccessNodePreviousSporkHosts[uint64(height)] = host
}
}

// todo validate Config values
return cfg, nil
}
124 changes: 124 additions & 0 deletions models/cross-spork_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package models
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Can we rename the file to cross_spork_client.go. Right now there's mixed - & _.
Another note: The functionality of this CrossSporkClient doesn't make me feel like it belongs to the models package, but rather the services package. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using - and _ mixed because the _ is instead of a space, whereas - is actually how you write "cross-spork" in english.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. This didn't even cross my mind 😇


import (
"context"
"fmt"
"github.com/onflow/cadence"
"github.com/onflow/flow-go-sdk"
"github.com/onflow/flow-go-sdk/access"
"github.com/onflow/flow-go-sdk/access/grpc"
"github.com/rs/zerolog"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)

// CrossSporkClient is a wrapper around the Flow AN client that can
// access different AN APIs based on the height boundaries of the sporks.
//
// Each spork is defined with the last height included in that spork,
// based on the list we know which AN client to use when requesting the data.
//
// Any API that supports cross-spork access must have a defined function
// that shadows the original access Client function.
type CrossSporkClient struct {
logger zerolog.Logger
// this map holds the last heights and clients for each spork
sporkHosts map[uint64]access.Client

access.Client
}

// NewCrossSporkClient creates a new instance of the client, it accepts the
// host to the current spork AN API.
func NewCrossSporkClient(currentSporkHost string, logger zerolog.Logger) (*CrossSporkClient, error) {
// add current spork AN host as the default client
client, err := grpc.NewClient(currentSporkHost)
if err != nil {
return nil, err
}

return &CrossSporkClient{
logger,
make(map[uint64]access.Client),
client,
}, nil
}

func (c *CrossSporkClient) AddSpork(lastHeight uint64, host string) error {
if _, ok := c.sporkHosts[lastHeight]; ok {
return fmt.Errorf("provided last height already exists")
}

client, err := grpc.NewClient(host)
if err != nil {
return err
}

c.sporkHosts[lastHeight] = client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a good idea to validate the range to make sure there is a contiguous range of blocks. You can use GetNodeVersionInfo() to get the first block (NodeRootBlockHeight is the first block the node has):
https://github.com/onflow/flow-go/blob/1d581d472b7dcc40cd8952da497422c910ebd99b/access/api.go#L259-L266

GetLatestBlockHeader() to get the latest sealed block.

Then you can search all nodes to check for gaps

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, didn't know it exists. In my mind if you wrongly configured it would err out when accessing, but this is better for sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have access to this method through the SDK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can issue this up as a follow up PR since I believe it will require updating Go SDK first and then updating here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


c.logger.Info().
Uint64("spork-boundary", lastHeight).
Str("host", host).
Msg("added spork specific client")

return nil
}

// getClientForHeight returns the client for the given height. It starts by using the current spork client,
// then iteratively checks the upper height boundaries in descending order and returns the last client
// that still contains the given height within its upper height limit. If no client is found, it returns
// the current spork client.
// Please note that even if a client for provided height is found we don't guarantee the data being available
// because it still might not have access to the height provided, because there might be other sporks with
// lower height boundaries that we didn't configure for.
// This would result in the error when using the client to access such data.
func (c *CrossSporkClient) getClientForHeight(height uint64) access.Client {
heights := maps.Keys(c.sporkHosts)
slices.Sort(heights) // order heights in ascending order
slices.Reverse(heights) // make it descending

// start by using the current spork client, then iterate all the upper height boundaries
// and find the last client that still contains the height in its upper height limit
client := c.Client
for _, upperBound := range heights {
if upperBound >= height {
client = c.sporkHosts[upperBound]

c.logger.Debug().
Uint64("spork-boundary", upperBound).
Msg("using previous spork client")
}
}

return client
}

func (c *CrossSporkClient) GetBlockByHeight(
ctx context.Context,
height uint64,
) (*flow.Block, error) {
return c.
getClientForHeight(height).
GetBlockByHeight(ctx, height)
}

func (c *CrossSporkClient) ExecuteScriptAtBlockHeight(
ctx context.Context,
height uint64,
script []byte,
arguments []cadence.Value,
) (cadence.Value, error) {
return c.
getClientForHeight(height).
ExecuteScriptAtBlockHeight(ctx, height, script, arguments)
}

func (c *CrossSporkClient) SubscribeEventsByBlockHeight(
ctx context.Context,
startHeight uint64,
filter flow.EventFilter,
) (<-chan flow.BlockEvents, <-chan error, error) {
return c.
getClientForHeight(startHeight).
SubscribeEventsByBlockHeight(ctx, startHeight, filter)
}
53 changes: 53 additions & 0 deletions models/cross-spork_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package models

import (
"context"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"testing"
)

func TestCrossSporkClient_MultiClient(t *testing.T) {
clientHosts := []string{"test1.com", "test2.com", "test3.com"}

client, err := NewCrossSporkClient(clientHosts[0], zerolog.Nop())
require.NoError(t, err)

err = client.AddSpork(100, clientHosts[1])
require.NoError(t, err)

err = client.AddSpork(200, clientHosts[2])
require.NoError(t, err)

c := client.getClientForHeight(300)
require.NotNil(t, c)

ctx := context.Background()

// this height should use current spork client
_, err = client.GetBlockByHeight(ctx, 300)
require.ErrorContains(t, err, clientHosts[0])

// this height should use test2 client
_, err = client.GetBlockByHeight(ctx, 150)
require.ErrorContains(t, err, clientHosts[2])

// this height should use test3 client
_, err = client.GetBlockByHeight(ctx, 50)
require.ErrorContains(t, err, clientHosts[1])

// test boundaries are inclusive
_, err = client.GetBlockByHeight(ctx, 200)
require.ErrorContains(t, err, clientHosts[2])
}

func TestCrossSporkClient_ExistingHeight(t *testing.T) {
client, err := NewCrossSporkClient("host1.com", zerolog.Nop())
require.NoError(t, err)

err = client.AddSpork(100, "host2.com")
require.NoError(t, err)

err = client.AddSpork(100, "host3.com")
require.EqualError(t, err, "provided last height already exists")
}
30 changes: 15 additions & 15 deletions tests/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,21 @@ func servicesSetup(t *testing.T) (emulator.Emulator, func()) {

// default config
cfg := &config.Config{
DatabaseDir: t.TempDir(),
AccessNodeGRPCHost: "localhost:3569", // emulator
RPCPort: 8545,
RPCHost: "127.0.0.1",
FlowNetworkID: "flow-emulator",
EVMNetworkID: evmTypes.FlowEVMPreviewNetChainID,
Coinbase: common.HexToAddress(eoaTestAddress),
COAAddress: service.Address,
COAKey: service.PrivateKey,
CreateCOAResource: false,
GasPrice: new(big.Int).SetUint64(0),
LogLevel: zerolog.DebugLevel,
LogWriter: zerolog.NewConsoleWriter(),
StreamTimeout: time.Second * 30,
StreamLimit: 10,
DatabaseDir: t.TempDir(),
AccessNodeHost: "localhost:3569", // emulator
RPCPort: 8545,
RPCHost: "127.0.0.1",
FlowNetworkID: "flow-emulator",
EVMNetworkID: evmTypes.FlowEVMPreviewNetChainID,
Coinbase: common.HexToAddress(eoaTestAddress),
COAAddress: service.Address,
COAKey: service.PrivateKey,
CreateCOAResource: false,
GasPrice: new(big.Int).SetUint64(0),
LogLevel: zerolog.DebugLevel,
LogWriter: zerolog.NewConsoleWriter(),
StreamTimeout: time.Second * 30,
StreamLimit: 10,
}

if !logOutput {
Expand Down
26 changes: 13 additions & 13 deletions tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ func Test_ConcurrentTransactionSubmission(t *testing.T) {
require.NoError(t, err)

cfg := &config.Config{
DatabaseDir: t.TempDir(),
AccessNodeGRPCHost: grpcHost,
RPCPort: 8545,
RPCHost: "127.0.0.1",
FlowNetworkID: "flow-emulator",
EVMNetworkID: types.FlowEVMTestNetChainID,
Coinbase: eoaTestAccount,
COAAddress: *createdAddr,
COAKeys: keys,
CreateCOAResource: true,
GasPrice: new(big.Int).SetUint64(0),
LogLevel: zerolog.DebugLevel,
LogWriter: os.Stdout,
DatabaseDir: t.TempDir(),
AccessNodeHost: grpcHost,
RPCPort: 8545,
RPCHost: "127.0.0.1",
FlowNetworkID: "flow-emulator",
EVMNetworkID: types.FlowEVMTestNetChainID,
Coinbase: eoaTestAccount,
COAAddress: *createdAddr,
COAKeys: keys,
CreateCOAResource: true,
GasPrice: new(big.Int).SetUint64(0),
LogLevel: zerolog.DebugLevel,
LogWriter: os.Stdout,
}

// todo change this test to use ingestion and emulator directly so we can completely remove
Expand Down
Loading