From e6f0a4ab8310601eee888178d4b6cc856e9f3e56 Mon Sep 17 00:00:00 2001 From: Omar Abdulaziz Date: Mon, 11 Mar 2024 16:08:41 +0200 Subject: [PATCH 1/3] add new indexer work to check node having ipv6: - add generator/loader for twin ipv6 and update the schema for the new table `node_ipv6` - add handle tests for the new filter on nodes `has_ipv6` - add new indexer work to call up nodes and check if they have ipv6 - update the db filters with the `has_ipv6` filter on nodes - add default values for the flags (1 worker runs every 1 day) - update makefile with helpful command to get db dump from the dev cluster --- grid-proxy/Makefile | 14 +++++- grid-proxy/cmds/proxy_server/main.go | 13 +++++ .../internal/explorer/db/indexer_calls.go | 8 ++++ grid-proxy/internal/explorer/db/postgres.go | 6 +++ grid-proxy/internal/explorer/db/types.go | 1 + grid-proxy/internal/indexer/ipv6.go | 48 +++++++++++++++++++ grid-proxy/pkg/types/indexer.go | 11 +++++ grid-proxy/pkg/types/nodes.go | 1 + .../tests/queries/mock_client/loader.go | 28 +++++++++++ grid-proxy/tests/queries/mock_client/nodes.go | 4 ++ grid-proxy/tests/queries/node_test.go | 7 +++ grid-proxy/tools/db/crafter/generator.go | 27 +++++++++++ grid-proxy/tools/db/schema.sql | 13 +++++ 13 files changed, 180 insertions(+), 1 deletion(-) create mode 100644 grid-proxy/internal/indexer/ipv6.go diff --git a/grid-proxy/Makefile b/grid-proxy/Makefile index b61d06bff..872e56b9f 100644 --- a/grid-proxy/Makefile +++ b/grid-proxy/Makefile @@ -3,6 +3,11 @@ PQ_HOST = $(shell docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddr PQ_CONTAINER = postgres count = 3 +NAMESPACE := tfchain-dev +DB_POD := $(shell kubectl --namespace $(NAMESPACE) get pods | grep processor-db | awk '{print $$1}') +DUMP_CMD := PGPASSWORD="postgres" pg_dump -U postgres tfgrid-graphql > /tmp/dump.sql + + install-swag: @go install github.com/swaggo/swag/cmd/swag@v1.8.12; @@ -34,6 +39,14 @@ db-fill: ## Fill the database with a randomly generated data --reset \ --seed 13 + +db-dump-make: + kubectl exec -it $(DB_POD) -n $(NAMESPACE) -- /bin/sh -c "$(DUMP_CMD)"; +db-dump-copy: + kubectl -n $(NAMESPACE) cp $(DB_POD):/tmp/dump.sql ~/Downloads/dump.sql; + +db-dump-get: db-dump-make db-dump-copy ## get a dump from processor_db on tfchain_dev cluster + db-dump: ## Load a dump of the database (Args: `p=`) @go run cmds/proxy_server/main.go \ -no-cert \ - -no-indexer \ --address :8080 \ --log-level debug \ --sql-log-level 4 \ diff --git a/grid-proxy/cmds/proxy_server/main.go b/grid-proxy/cmds/proxy_server/main.go index d14cb3a2d..bdd50b050 100644 --- a/grid-proxy/cmds/proxy_server/main.go +++ b/grid-proxy/cmds/proxy_server/main.go @@ -66,6 +66,8 @@ type flags struct { dmiIndexerIntervalMins uint speedIndexerNumWorkers uint speedIndexerIntervalMins uint + ipv6IndexerNumWorkers uint + ipv6IndexerIntervalMins uint } func main() { @@ -99,6 +101,8 @@ func main() { flag.UintVar(&f.dmiIndexerNumWorkers, "dmi-indexer-workers", 1, "number of workers checking on node dmi") flag.UintVar(&f.speedIndexerIntervalMins, "speed-indexer-interval", 5, "node speed check interval in min") flag.UintVar(&f.speedIndexerNumWorkers, "speed-indexer-workers", 100, "number of workers checking on node speed") + flag.UintVar(&f.ipv6IndexerIntervalMins, "ipv6-indexer-interval", 1, "node ipv6 check interval in min") + flag.UintVar(&f.ipv6IndexerNumWorkers, "ipv6-indexer-workers", 10, "number of workers checking on node having ipv6") flag.Parse() // shows version and exit @@ -191,6 +195,15 @@ func startIndexers(ctx context.Context, f flags, db db.Database, rpcRmbClient *p f.speedIndexerNumWorkers, ) speedIdx.Start(ctx) + + ipv6Idx := indexer.NewIndexer[types.HasIpv6]( + indexer.NewIpv6Work(f.ipv6IndexerIntervalMins), + "IPV6", + db, + rpcRmbClient, + f.ipv6IndexerNumWorkers, + ) + ipv6Idx.Start(ctx) } func app(s *http.Server, f flags) error { diff --git a/grid-proxy/internal/explorer/db/indexer_calls.go b/grid-proxy/internal/explorer/db/indexer_calls.go index 805ac1798..645d78865 100644 --- a/grid-proxy/internal/explorer/db/indexer_calls.go +++ b/grid-proxy/internal/explorer/db/indexer_calls.go @@ -60,3 +60,11 @@ func (p *PostgresDatabase) UpsertNetworkSpeed(ctx context.Context, speeds []type } return p.gormDB.WithContext(ctx).Table("speed").Clauses(conflictClause).Create(&speeds).Error } + +func (p *PostgresDatabase) UpsertNodeIpv6Report(ctx context.Context, ips []types.HasIpv6) error { + onConflictClause := clause.OnConflict{ + Columns: []clause.Column{{Name: "node_twin_id"}}, + DoUpdates: clause.AssignmentColumns([]string{"has_ipv6"}), + } + return p.gormDB.WithContext(ctx).Table("node_ipv6").Clauses(onConflictClause).Create(&ips).Error +} diff --git a/grid-proxy/internal/explorer/db/postgres.go b/grid-proxy/internal/explorer/db/postgres.go index 55b670bd5..dd1603ae4 100644 --- a/grid-proxy/internal/explorer/db/postgres.go +++ b/grid-proxy/internal/explorer/db/postgres.go @@ -82,6 +82,7 @@ func (d *PostgresDatabase) Initialize() error { &types.HealthReport{}, &types.Dmi{}, &types.Speed{}, + &types.HasIpv6{}, ) if err != nil { return errors.Wrap(err, "failed to migrate indexer tables") @@ -296,6 +297,7 @@ func (d *PostgresDatabase) nodeTableQuery(ctx context.Context, filter types.Node "resources_cache.node_contracts_count", "resources_cache.node_gpu_count AS num_gpu", "health_report.healthy", + "node_ipv6.has_ipv6", "resources_cache.bios", "resources_cache.baseboard", "resources_cache.memory", @@ -311,6 +313,7 @@ func (d *PostgresDatabase) nodeTableQuery(ctx context.Context, filter types.Node LEFT JOIN farm ON node.farm_id = farm.farm_id LEFT JOIN location ON node.location_id = location.id LEFT JOIN health_report ON node.twin_id = health_report.node_twin_id + LEFT JOIN node_ipv6 ON node.twin_id = node_ipv6.node_twin_id `) if filter.HasGPU != nil || filter.GpuDeviceName != nil || @@ -532,6 +535,9 @@ func (d *PostgresDatabase) GetNodes(ctx context.Context, filter types.NodeFilter if filter.Healthy != nil { q = q.Where("health_report.healthy = ? ", *filter.Healthy) } + if filter.HasIpv6 != nil { + q = q.Where("COALESCE(node_ipv6.has_ipv6, false) = ? ", *filter.HasIpv6) + } if filter.FreeMRU != nil { q = q.Where("resources_cache.free_mru >= ?", *filter.FreeMRU) } diff --git a/grid-proxy/internal/explorer/db/types.go b/grid-proxy/internal/explorer/db/types.go index 2c0ab13aa..64c8b82cd 100644 --- a/grid-proxy/internal/explorer/db/types.go +++ b/grid-proxy/internal/explorer/db/types.go @@ -32,6 +32,7 @@ type Database interface { UpsertNodeHealth(ctx context.Context, healthReports []types.HealthReport) error UpsertNodeDmi(ctx context.Context, dmis []types.Dmi) error UpsertNetworkSpeed(ctx context.Context, speeds []types.Speed) error + UpsertNodeIpv6Report(ctx context.Context, ips []types.HasIpv6) error } type ContractBilling types.ContractBilling diff --git a/grid-proxy/internal/indexer/ipv6.go b/grid-proxy/internal/indexer/ipv6.go new file mode 100644 index 000000000..714fb30e8 --- /dev/null +++ b/grid-proxy/internal/indexer/ipv6.go @@ -0,0 +1,48 @@ +package indexer + +import ( + "context" + "time" + + "github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/internal/explorer/db" + "github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/types" + "github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go/peer" +) + +const cmd = "zos.network.has_ipv6" + +var _ Work[types.HasIpv6] = (*Ipv6Work)(nil) + +type Ipv6Work struct { + finders map[string]time.Duration +} + +func NewIpv6Work(interval uint) *Ipv6Work { + return &Ipv6Work{ + finders: map[string]time.Duration{ + "up": time.Duration(interval) * time.Minute, + }, + } +} + +func (w *Ipv6Work) Finders() map[string]time.Duration { + return w.finders +} + +func (w *Ipv6Work) Get(ctx context.Context, rmb *peer.RpcClient, id uint32) ([]types.HasIpv6, error) { + var has_ipv6 bool + if err := callNode(ctx, rmb, cmd, nil, id, has_ipv6); err != nil { + return []types.HasIpv6{}, nil + } + + return []types.HasIpv6{ + { + NodeTwinId: id, + HasIpv6: has_ipv6, + }, + }, nil +} + +func (w *Ipv6Work) Upsert(ctx context.Context, db db.Database, batch []types.HasIpv6) error { + return db.UpsertNodeIpv6Report(ctx, batch) +} diff --git a/grid-proxy/pkg/types/indexer.go b/grid-proxy/pkg/types/indexer.go index 26bbf3cc3..f7b31d536 100644 --- a/grid-proxy/pkg/types/indexer.go +++ b/grid-proxy/pkg/types/indexer.go @@ -26,6 +26,17 @@ func (HealthReport) TableName() string { return "health_report" } +// HasIpv6 holds the state of node having ipv6 +// used as gorm model +type HasIpv6 struct { + NodeTwinId uint32 `gorm:"unique;not null"` + HasIpv6 bool +} + +func (HasIpv6) TableName() string { + return "node_ipv6" +} + // Speed holds upload/download speeds in `bit/sec` for a node // used as both gorm model and server json response type Speed struct { diff --git a/grid-proxy/pkg/types/nodes.go b/grid-proxy/pkg/types/nodes.go index e6ca16b68..e81eb032f 100644 --- a/grid-proxy/pkg/types/nodes.go +++ b/grid-proxy/pkg/types/nodes.go @@ -148,4 +148,5 @@ type NodeFilter struct { PriceMin *float64 `schema:"price_min,omitempty"` PriceMax *float64 `schema:"price_max,omitempty"` Excluded []uint64 `schema:"excluded,omitempty"` + HasIpv6 *bool `schema:"has_ipv6,omitempty"` } diff --git a/grid-proxy/tests/queries/mock_client/loader.go b/grid-proxy/tests/queries/mock_client/loader.go index 0b8cf8c13..b621e9502 100644 --- a/grid-proxy/tests/queries/mock_client/loader.go +++ b/grid-proxy/tests/queries/mock_client/loader.go @@ -40,6 +40,7 @@ type DBData struct { Regions map[string]string Locations map[string]Location HealthReports map[uint32]bool + NodeIpv6 map[uint32]bool DMIs map[uint32]types.Dmi Speeds map[uint32]types.Speed PricingPolicies map[uint]PricingPolicy @@ -568,6 +569,30 @@ func loadHealthReports(db *sql.DB, data *DBData) error { return nil } +func loadNodeIpv6(db *sql.DB, data *DBData) error { + rows, err := db.Query(` + SELECT + COALESCE(node_twin_id, 0), + COALESCE(has_ipv6, false) + FROM + node_ipv6;`) + if err != nil { + return err + } + for rows.Next() { + var node types.HasIpv6 + if err := rows.Scan( + &node.NodeTwinId, + &node.HasIpv6, + ); err != nil { + return err + } + data.NodeIpv6[node.NodeTwinId] = node.HasIpv6 + } + + return nil +} + func loadDMIs(db *sql.DB, gormDB *gorm.DB, data *DBData) error { var dmis []types.Dmi err := gormDB.Table("dmi").Scan(&dmis).Error @@ -740,6 +765,9 @@ func Load(db *sql.DB, gormDB *gorm.DB) (DBData, error) { if err := loadHealthReports(db, &data); err != nil { return data, err } + if err := loadNodeIpv6(db, &data); err != nil { + return data, err + } if err := loadDMIs(db, gormDB, &data); err != nil { return data, err } diff --git a/grid-proxy/tests/queries/mock_client/nodes.go b/grid-proxy/tests/queries/mock_client/nodes.go index f829005b9..7fbac06cb 100644 --- a/grid-proxy/tests/queries/mock_client/nodes.go +++ b/grid-proxy/tests/queries/mock_client/nodes.go @@ -295,6 +295,10 @@ func (n *Node) satisfies(f types.NodeFilter, data *DBData) bool { return false } + if f.HasIpv6 != nil && *f.HasIpv6 != data.NodeIpv6[uint32(n.TwinID)] { + return false + } + if f.FreeSRU != nil && int64(*f.FreeSRU) > int64(free.SRU) { return false } diff --git a/grid-proxy/tests/queries/node_test.go b/grid-proxy/tests/queries/node_test.go index 8d878d196..406fb0f63 100644 --- a/grid-proxy/tests/queries/node_test.go +++ b/grid-proxy/tests/queries/node_test.go @@ -63,6 +63,13 @@ var nodeFilterRandomValueGenerator = map[string]func(agg NodesAggregate) interfa } return &v }, + "HasIpv6": func(_ NodesAggregate) interface{} { + v := true + if flip(.5) { + v = false + } + return &v + }, "FreeMRU": func(agg NodesAggregate) interface{} { if flip(.1) { return &agg.freeMRUs[rand.Intn(len(agg.freeMRUs))] diff --git a/grid-proxy/tools/db/crafter/generator.go b/grid-proxy/tools/db/crafter/generator.go index 24883bcc1..cc07cf289 100644 --- a/grid-proxy/tools/db/crafter/generator.go +++ b/grid-proxy/tools/db/crafter/generator.go @@ -937,3 +937,30 @@ func (c *Crafter) GeneratePricingPolicies() error { return err } + +func (c *Crafter) GenerateNodeIpv6() error { + start := c.NodeStart + end := c.NodeStart + c.NodeCount + nodeTwinsStart := c.TwinStart + (c.FarmStart + c.FarmCount) + + var reports []types.HasIpv6 + for i := start; i < end; i++ { + has_ipv6 := true + if flip(.5) { + has_ipv6 = false + } + + report := types.HasIpv6{ + NodeTwinId: uint32(nodeTwinsStart + i), + HasIpv6: has_ipv6, + } + reports = append(reports, report) + } + + if err := c.gormDB.Create(reports).Error; err != nil { + return fmt.Errorf("failed to insert node has ipv6 reports: %w", err) + } + fmt.Println("node has ipv6 reports generated") + + return nil +} diff --git a/grid-proxy/tools/db/schema.sql b/grid-proxy/tools/db/schema.sql index 2878c25e0..5cc7c077d 100644 --- a/grid-proxy/tools/db/schema.sql +++ b/grid-proxy/tools/db/schema.sql @@ -1080,3 +1080,16 @@ CREATE TABLE public.speed( ALTER TABLE public.speed OWNER TO postgres; + +-- +-- Name: node_ipv6; Type: TABLE; Schema: public; Owner: postgres +-- + +CREATE TABLE IF NOT EXISTS public.node_ipv6 ( + node_twin_id bigint NOT NULL, + has_ipv6 boolean +); + +ALTER TABLE public.node_ipv6 + OWNER TO postgres; + From 66dca1ee348ea75683bcdb82a2935f9150b1e35e Mon Sep 17 00:00:00 2001 From: Omar Abdulaziz Date: Wed, 17 Apr 2024 16:03:50 +0200 Subject: [PATCH 2/3] generate node ipv6 reports --- grid-proxy/tests/queries/mock_client/loader.go | 1 + grid-proxy/tools/db/generate.go | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/grid-proxy/tests/queries/mock_client/loader.go b/grid-proxy/tests/queries/mock_client/loader.go index b621e9502..7ef817f94 100644 --- a/grid-proxy/tests/queries/mock_client/loader.go +++ b/grid-proxy/tests/queries/mock_client/loader.go @@ -717,6 +717,7 @@ func Load(db *sql.DB, gormDB *gorm.DB) (DBData, error) { HealthReports: make(map[uint32]bool), DMIs: make(map[uint32]types.Dmi), Speeds: make(map[uint32]types.Speed), + NodeIpv6: make(map[uint32]bool), PricingPolicies: make(map[uint]PricingPolicy), DB: db, } diff --git a/grid-proxy/tools/db/generate.go b/grid-proxy/tools/db/generate.go index 8a684f24f..657542449 100644 --- a/grid-proxy/tools/db/generate.go +++ b/grid-proxy/tools/db/generate.go @@ -126,7 +126,11 @@ func generateData(db *sql.DB, gormDB *gorm.DB, seed int) error { } if err := generator.GenerateHealthReports(); err != nil { - return fmt.Errorf("failed to generate dmi reports: %w", err) + return fmt.Errorf("failed to generate health reports: %w", err) + } + + if err := generator.GenerateNodeIpv6(); err != nil { + return fmt.Errorf("failed to generate node ipv6 reports: %w", err) } if err := generator.GeneratePricingPolicies(); err != nil { From 12acb74c7e7bddc23ed868597fb5f9d5f90c1311 Mon Sep 17 00:00:00 2001 From: Omar Abdulaziz Date: Thu, 18 Apr 2024 14:52:28 +0200 Subject: [PATCH 3/3] update makefile --- grid-proxy/Makefile | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/grid-proxy/Makefile b/grid-proxy/Makefile index 872e56b9f..b61d06bff 100644 --- a/grid-proxy/Makefile +++ b/grid-proxy/Makefile @@ -3,11 +3,6 @@ PQ_HOST = $(shell docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddr PQ_CONTAINER = postgres count = 3 -NAMESPACE := tfchain-dev -DB_POD := $(shell kubectl --namespace $(NAMESPACE) get pods | grep processor-db | awk '{print $$1}') -DUMP_CMD := PGPASSWORD="postgres" pg_dump -U postgres tfgrid-graphql > /tmp/dump.sql - - install-swag: @go install github.com/swaggo/swag/cmd/swag@v1.8.12; @@ -39,14 +34,6 @@ db-fill: ## Fill the database with a randomly generated data --reset \ --seed 13 - -db-dump-make: - kubectl exec -it $(DB_POD) -n $(NAMESPACE) -- /bin/sh -c "$(DUMP_CMD)"; -db-dump-copy: - kubectl -n $(NAMESPACE) cp $(DB_POD):/tmp/dump.sql ~/Downloads/dump.sql; - -db-dump-get: db-dump-make db-dump-copy ## get a dump from processor_db on tfchain_dev cluster - db-dump: ## Load a dump of the database (Args: `p=`) @go run cmds/proxy_server/main.go \ -no-cert \ + -no-indexer \ --address :8080 \ --log-level debug \ --sql-log-level 4 \