Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor the indexer #726

Merged
merged 24 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
20ee9d7
convert indexers to watchers in a mono indexer
Omarabdul3ziz Jan 31, 2024
5e4787d
use a single rmb client of type peer.RpcClient across the project
Omarabdul3ziz Jan 31, 2024
7cde9d7
configure starting/stoping indexer/modifier from flag
Omarabdul3ziz Jan 31, 2024
4c14d01
wip: introducing the dmi watcher
Omarabdul3ziz Jan 31, 2024
fb32310
upsert dmi on database
Omarabdul3ziz Feb 4, 2024
606f301
add network speed watcher
Omarabdul3ziz Feb 4, 2024
8ae0a14
implement an Scanner/Valuer interfaces for custom gorm jsonb types in…
Omarabdul3ziz Feb 7, 2024
6f01097
fix network speed types
Omarabdul3ziz Feb 7, 2024
a2ec728
add triggers for dmi/speed tables
Omarabdul3ziz Feb 8, 2024
fae4ee7
add generators:
Omarabdul3ziz Feb 8, 2024
23645c4
add loader/tests for the new dmi/speed data
Omarabdul3ziz Feb 8, 2024
5fcf9e3
make the interval/worker configurable
Omarabdul3ziz Feb 8, 2024
7e6c246
Merge branch 'development' of https://github.com/threefoldtech/tfgrid…
Omarabdul3ziz Feb 8, 2024
2398e23
stop data modification, remove unnecessary conversion, silent the gor…
Omarabdul3ziz Feb 8, 2024
1e38b61
Merge branch 'development' of https://github.com/threefoldtech/tfgrid…
Omarabdul3ziz Feb 11, 2024
dca941c
refactor the indexer manager to implement a unified interface for ind…
Omarabdul3ziz Feb 18, 2024
20de686
Merge branch 'development' of https://github.com/threefoldtech/tfgrid…
Omarabdul3ziz Feb 18, 2024
ee9a31b
clean & complete some missing parts:
Omarabdul3ziz Feb 19, 2024
708df2e
update mod files
Omarabdul3ziz Feb 19, 2024
cd2ae9a
Merge branch 'development' of https://github.com/threefoldtech/tfgrid…
Omarabdul3ziz Feb 19, 2024
ad4d979
silent the gorm logs in dump data generation
Omarabdul3ziz Feb 19, 2024
2db2a21
abstract the indexer pkg code using generics
Omarabdul3ziz Feb 25, 2024
119a62a
Merge branch 'development' of https://github.com/threefoldtech/tfgrid…
Omarabdul3ziz Mar 12, 2024
e068289
remove already merged trigger on node_gpu
Omarabdul3ziz Mar 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/grid-proxy-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ jobs:
pushd tools/db
go run . --seed 13 --postgres-host localhost --postgres-db tfgrid-graphql --postgres-password postgres --postgres-user postgres --reset
popd
go run cmds/proxy_server/main.go -no-cert --address :8080 --log-level debug --postgres-host localhost --postgres-db tfgrid-graphql --postgres-password postgres --postgres-user postgres --health-indexer-workers 0 --mnemonics "$MNEMONICS" &
go run cmds/proxy_server/main.go -no-cert -no-indexer --address :8080 --log-level debug --postgres-host localhost --postgres-db tfgrid-graphql --postgres-password postgres --postgres-user postgres --mnemonics "$MNEMONICS" &
sleep 10
pushd tests/queries
go test -v --seed 13 --postgres-host localhost --postgres-db tfgrid-graphql --postgres-password postgres --postgres-user postgres --endpoint http://localhost:8080
go test -v --seed 13 -no-modify --postgres-host localhost --postgres-db tfgrid-graphql --postgres-password postgres --postgres-user postgres --endpoint http://localhost:8080
popd
3 changes: 3 additions & 0 deletions grid-proxy/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ db-refill: db-stop db-start sleep db-fill
server-start: ## Start the proxy server (Args: `m=<MNEMONICS>`)
@go run cmds/proxy_server/main.go \
-no-cert \
-no-indexer \
--address :8080 \
--log-level debug \
--postgres-host $(PQ_HOST) \
Expand All @@ -63,6 +64,7 @@ test-queries: ## Run all queries tests
@cd tests/queries/ &&\
go test -v \
-parallel 20 \
-no-modify \
--seed 13 \
--postgres-host $(PQ_HOST) \
--postgres-db tfgrid-graphql \
Expand All @@ -75,6 +77,7 @@ test-query: ## Run specific test query (Args: `t=TestName`).
@cd tests/queries/ &&\
go test -v \
-parallel 10 \
-no-modify \
--seed 13 \
--postgres-host $(PQ_HOST) \
--postgres-db tfgrid-graphql \
Expand Down
148 changes: 88 additions & 60 deletions grid-proxy/cmds/proxy_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import (
"fmt"
"net/http"
"os"
"strings"
"time"

"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/internal/certmanager"
"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/internal/explorer"
"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/internal/explorer/db"
"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/internal/gpuindexer"
"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/internal/healthindexer"
"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/internal/indexer"
logging "github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg"
rmb "github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go/peer"
Expand All @@ -35,31 +36,35 @@ const (
var GitCommit string

type flags struct {
debug string
postgresHost string
postgresPort int
postgresDB string
postgresUser string
postgresPassword string
sqlLogLevel int
address string
version bool
nocert bool
domain string
TLSEmail string
CA string
certCacheDir string
tfChainURL string
relayURL string
mnemonics string
gpuIndexerCheckIntervalMins uint
gpuIndexerBatchSize uint
gpuIndexerResultWorkers uint
gpuIndexerBatchWorkers uint
maxPoolOpenConnections int
// being 0 is helpful of making the data persistent while testing
healthIndexerWorkers uint
healthIndexerInterval uint
debug string
postgresHost string
postgresPort int
postgresDB string
postgresUser string
postgresPassword string
sqlLogLevel int
address string
version bool
nocert bool
domain string
TLSEmail string
CA string
certCacheDir string
tfChainURL string
relayURL string
mnemonics string
maxPoolOpenConnections int

noIndexer bool // true to stop the indexer, useful on running for testing
indexerUpserterBatchSize uint
gpuIndexerIntervalMins uint
gpuIndexerNumWorkers uint
healthIndexerNumWorkers uint
healthIndexerIntervalMins uint
dmiIndexerNumWorkers uint
dmiIndexerIntervalMins uint
speedIndexerNumWorkers uint
speedIndexerIntervalMins uint
}

func main() {
Expand All @@ -81,13 +86,18 @@ func main() {
flag.StringVar(&f.tfChainURL, "tfchain-url", DefaultTFChainURL, "TF chain url")
flag.StringVar(&f.relayURL, "relay-url", DefaultRelayURL, "RMB relay url")
flag.StringVar(&f.mnemonics, "mnemonics", "", "Dummy user mnemonics for relay calls")
flag.UintVar(&f.gpuIndexerCheckIntervalMins, "indexer-interval-min", 60, "the interval that the GPU indexer will run")
flag.UintVar(&f.gpuIndexerBatchSize, "indexer-batch-size", 20, "batch size for the GPU indexer worker batch")
flag.UintVar(&f.gpuIndexerResultWorkers, "indexer-results-workers", 2, "number of workers to process indexer GPU info")
flag.UintVar(&f.gpuIndexerBatchWorkers, "indexer-batch-workers", 2, "number of workers to process batch GPU info")
flag.IntVar(&f.maxPoolOpenConnections, "max-open-conns", 80, "max number of db connection pool open connections")
flag.UintVar(&f.healthIndexerWorkers, "health-indexer-workers", 100, "number of workers checking on node health")
flag.UintVar(&f.healthIndexerInterval, "health-indexer-interval", 5, "node health check interval in min")

flag.BoolVar(&f.noIndexer, "no-indexer", false, "do not start the indexer")
flag.UintVar(&f.indexerUpserterBatchSize, "indexer-upserter-batch-size", 20, "results batch size which collected before upserting")
flag.UintVar(&f.gpuIndexerIntervalMins, "gpu-indexer-interval", 60, "the interval that the GPU indexer will run")
flag.UintVar(&f.gpuIndexerNumWorkers, "gpu-indexer-workers", 100, "number of workers to process indexer GPU info")
flag.UintVar(&f.healthIndexerIntervalMins, "health-indexer-interval", 5, "node health check interval in min")
flag.UintVar(&f.healthIndexerNumWorkers, "health-indexer-workers", 100, "number of workers checking on node health")
flag.UintVar(&f.dmiIndexerIntervalMins, "dmi-indexer-interval", 60*24, "node dmi check interval in min")
flag.UintVar(&f.dmiIndexerNumWorkers, "dmi-indexer-workers", 1, "number of workers checking on node dmi")
flag.UintVar(&f.speedIndexerIntervalMins, "speed-indexer-interval", 5, "node speed check interval in min")
flag.UintVar(&f.speedIndexerNumWorkers, "speed-indexer-workers", 100, "number of workers checking on node speed")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering why do we need multiple indexer workers to query things from nodes, can't we have a single set of workers that they ask the node for the same thing for everything?

I mean i see here, dmi-indexer-workers, gpu-indexer-workers, speed-indexer-workers (whatever that means), etc...

This is just a question, not request for change

flag.Parse()

// shows version and exit
Expand All @@ -112,11 +122,6 @@ func main() {

subManager := substrate.NewManager(f.tfChainURL)

relayRPCClient, err := createRPCRMBClient(ctx, f.relayURL, f.mnemonics, subManager)
if err != nil {
log.Fatal().Err(err).Msg("failed to create relay client")
}

db, err := db.NewPostgresDatabase(f.postgresHost, f.postgresPort, f.postgresUser, f.postgresPassword, f.postgresDB, f.maxPoolOpenConnections, logger.LogLevel(f.sqlLogLevel))
if err != nil {
log.Fatal().Err(err).Msg("couldn't get postgres client")
Expand All @@ -126,33 +131,56 @@ func main() {
log.Fatal().Err(err).Msg("failed to initialize database")
}

dbClient := explorer.DBClient{
DB: &db,
dbClient := explorer.DBClient{DB: &db}
rpcRmbClient, err := createRPCRMBClient(ctx, f.relayURL, f.mnemonics, subManager)
if err != nil {
log.Fatal().Err(err).Msg("failed to create relay client")
}
manager := indexer.NewManager(ctx)

indexer, err := gpuindexer.NewNodeGPUIndexer(
ctx,
f.relayURL,
f.mnemonics,
subManager, &db,
f.gpuIndexerCheckIntervalMins,
f.gpuIndexerBatchSize,
f.gpuIndexerResultWorkers,
f.gpuIndexerBatchWorkers,
gpuIndexer := indexer.NewGPUIndexer(
rpcRmbClient,
&db,
f.indexerUpserterBatchSize,
f.gpuIndexerIntervalMins,
f.gpuIndexerNumWorkers,
)
if err != nil {
log.Fatal().Err(err).Msg("failed to create GPU indexer")
}
manager.Register("GPU", gpuIndexer)

indexer.Start(ctx)
healthIndexer := indexer.NewNodeHealthIndexer(
rpcRmbClient,
&db,
f.indexerUpserterBatchSize,
f.healthIndexerNumWorkers,
f.healthIndexerIntervalMins,
)
manager.Register("Health", healthIndexer)

healthIndexer, err := healthindexer.NewNodeHealthIndexer(ctx, &db, subManager, f.mnemonics, f.relayURL, f.healthIndexerWorkers, f.healthIndexerInterval)
if err != nil {
log.Fatal().Err(err).Msg("failed to create health indexer")
dmiIndexer := indexer.NewDmiIndexer(
rpcRmbClient,
&db,
f.indexerUpserterBatchSize,
f.dmiIndexerIntervalMins,
f.dmiIndexerNumWorkers,
)
manager.Register("DMI", dmiIndexer)

speedIndexer := indexer.NewSpeedIndexer(
rpcRmbClient,
&db,
f.indexerUpserterBatchSize,
f.speedIndexerIntervalMins,
f.speedIndexerNumWorkers,
)
manager.Register("Speed", speedIndexer)

if !f.noIndexer {
manager.Start()
} else {
log.Info().Msg("Indexers Manager did not start")
}
healthIndexer.Start(ctx)

s, err := createServer(f, dbClient, GitCommit, relayRPCClient)
s, err := createServer(f, dbClient, GitCommit, rpcRmbClient)
if err != nil {
log.Fatal().Err(err).Msg("failed to create mux server")
}
Expand Down Expand Up @@ -208,8 +236,8 @@ func app(s *http.Server, f flags) error {
return nil
}

func createRPCRMBClient(ctx context.Context, relayURL, mnemonics string, subManager substrate.Manager) (rmb.Client, error) {
sessionId := fmt.Sprintf("tfgrid_proxy-%d", os.Getpid())
func createRPCRMBClient(ctx context.Context, relayURL, mnemonics string, subManager substrate.Manager) (*peer.RpcClient, error) {
sessionId := fmt.Sprintf("tfgrid-proxy-%s", strings.Split(uuid.NewString(), "-")[0])
client, err := peer.NewRpcClient(ctx, mnemonics, subManager, peer.WithRelay(relayURL), peer.WithSession(sessionId))
if err != nil {
return nil, fmt.Errorf("failed to create direct RPC RMB client: %w", err)
Expand Down
21 changes: 21 additions & 0 deletions grid-proxy/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
require (
github.com/ChainSafe/go-schnorrkel v1.1.0 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/centrifuge/go-substrate-rpc-client/v4 v4.0.12 // indirect
Expand All @@ -37,17 +38,23 @@ require (
github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/ethereum/go-ethereum v1.11.6 // indirect
github.com/garyburd/redigo v1.6.2 // indirect
github.com/go-co-op/gocron v1.33.1 // indirect
github.com/go-jose/go-jose/v3 v3.0.1 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/spec v0.20.8 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/gomodule/redigo v2.0.0+incompatible // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/gtank/merlin v0.1.1 // indirect
github.com/gtank/ristretto255 v0.1.2 // indirect
github.com/hasura/go-graphql-client v0.10.0 // indirect
github.com/holiman/uint256 v1.2.3 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
Expand All @@ -56,25 +63,39 @@ require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/miekg/dns v1.1.58 // indirect
github.com/mimoo/StrobeGo v0.0.0-20220103164710-9a04d6ca976b // indirect
github.com/pierrec/xxHash v0.1.5 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rs/cors v1.10.1 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/swaggo/files v1.0.1 // indirect
github.com/threefoldtech/zbus v1.0.1 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/vedhavyas/go-subkey v1.0.3 // indirect
github.com/vishvananda/netlink v1.1.1-0.20201029203352-d40f9887b852 // indirect
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect
github.com/vmihailenco/msgpack v4.0.4+incompatible // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.17.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
nhooyr.io/websocket v1.8.7 // indirect
)

replace github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go => ../rmb-sdk-go
Loading
Loading