Skip to content

Commit

Permalink
recalc the node gpu count
Browse files Browse the repository at this point in the history
- re calculate the node gpu count with each trigger instead of
inc/decreasing the accumilated count
- trigger the update cache function on update beside insert/delete for
an updated gpus list in the cache
- invalidate the old indexed card in case the node returns null as the
card ejected
  • Loading branch information
Omarabdul3ziz committed Feb 23, 2025
1 parent 924030b commit 440477c
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 42 deletions.
2 changes: 1 addition & 1 deletion grid-proxy/cmds/proxy_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func main() {

func startIndexers(ctx context.Context, f flags, db db.Database, rpcRmbClient *peer.RpcClient) {
gpuIdx := indexer.NewIndexer[types.NodeGPU](
indexer.NewGPUWork(f.gpuIndexerIntervalMins),
indexer.NewGPUWork(f.gpuIndexerIntervalMins, db),
"GPU",
db,
rpcRmbClient,
Expand Down
35 changes: 14 additions & 21 deletions grid-proxy/internal/explorer/db/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -478,27 +478,20 @@ $$
BEGIN
BEGIN
UPDATE resources_cache
SET node_gpu_count = node_gpu_count + (
CASE
WHEN TG_OP = 'INSERT'
THEN 1
WHEN TG_OP = 'DELETE'
THEN -1
ELSE 0
END
),
gpus = (
SELECT json_agg(
json_build_object(
'id', node_gpu.id,
'vendor', node_gpu.vendor,
'device', node_gpu.device,
'contract', node_gpu.contract
)
SET node_gpu_count = gpu.count, gpus = gpu.gpus
FROM (
SELECT COUNT(*) AS count,
json_agg(
json_build_object(
'id', id,
'vendor', vendor,
'device', device,
'contract', contract
)
from node_gpu where node_twin_id = COALESCE(NEW.node_twin_id, OLD.node_twin_id)
)

) AS gpus
FROM node_gpu
WHERE node_twin_id = COALESCE(NEW.node_twin_id, OLD.node_twin_id)
) AS gpu
WHERE resources_cache.node_id = (
SELECT node_id from node where node.twin_id = COALESCE(NEW.node_twin_id, OLD.node_twin_id)
);
Expand All @@ -511,7 +504,7 @@ END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER tg_node_gpu_count
AFTER INSERT OR DELETE ON node_gpu FOR EACH ROW
AFTER INSERT OR DELETE OR UPDATE ON node_gpu FOR EACH ROW
EXECUTE PROCEDURE reflect_node_gpu_count_change();

/*
Expand Down
43 changes: 23 additions & 20 deletions grid-proxy/internal/indexer/gpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ const (

type GPUWork struct {
findersInterval map[string]time.Duration
db db.Database
}

func NewGPUWork(interval uint) *GPUWork {
func NewGPUWork(interval uint, db db.Database) *GPUWork {
return &GPUWork{
findersInterval: map[string]time.Duration{
"up": time.Duration(interval) * time.Minute,
"new": newNodesCheckInterval,
},
db: db,
}
}

Expand All @@ -32,12 +34,20 @@ func (w *GPUWork) Finders() map[string]time.Duration {
}

func (w *GPUWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.NodeGPU, error) {
// in case an error returned? return directly we can leave the previously indexed cards
// in case null returned? we need to clean all previously added cards till now
// in case cards changed? Upsert() will take care of invalidating the old cards

var gpus []types.NodeGPU
err := callNode(ctx, rmb, gpuListCmd, nil, twinId, &gpus)
if err != nil {
if err := callNode(ctx, rmb, gpuListCmd, nil, twinId, &gpus); err != nil {
return gpus, err
}

before := time.Now().Unix()
if err := w.db.DeleteOldGpus(ctx, []uint32{twinId}, before); err != nil {
return gpus, fmt.Errorf("failed to remove old GPUs: %w", err)
}

for i := 0; i < len(gpus); i++ {
gpus[i].NodeTwinID = twinId
gpus[i].UpdatedAt = time.Now().Unix()
Expand All @@ -47,28 +57,21 @@ func (w *GPUWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) (
}

func (w *GPUWork) Upsert(ctx context.Context, db db.Database, batch []types.NodeGPU) error {
expirationInterval := w.findersInterval["up"]
err := discardOldGpus(ctx, db, expirationInterval, batch)
if err != nil {
nodeTwinIds := []uint32{}
for _, gpu := range batch {
nodeTwinIds = append(nodeTwinIds, gpu.NodeTwinID)
}

// Invalidate old indexed GPUs for the same node, but first check the batch
// to avoid removing GPUs inserted in the last batch within the same indexer run.
before := time.Now().Add(-w.findersInterval["up"]).Unix()
if err := db.DeleteOldGpus(ctx, nodeTwinIds, before); err != nil {
return fmt.Errorf("failed to remove old GPUs: %w", err)
}

err = db.UpsertNodesGPU(ctx, batch)
if err != nil {
if err := db.UpsertNodesGPU(ctx, batch); err != nil {
return fmt.Errorf("failed to upsert new GPUs: %w", err)
}

return nil
}

func discardOldGpus(ctx context.Context, database db.Database, interval time.Duration, gpuBatch []types.NodeGPU) error {
// invalidate the old indexed GPUs for the same node,
// but check the batch first to ensure it does not contain related GPUs to node twin it from the last batch.
nodeTwinIds := []uint32{}
for _, gpu := range gpuBatch {
nodeTwinIds = append(nodeTwinIds, gpu.NodeTwinID)
}

expiration := time.Now().Unix() - int64(interval.Seconds())
return database.DeleteOldGpus(ctx, nodeTwinIds, expiration)
}

0 comments on commit 440477c

Please sign in to comment.