Skip to content

Commit

Permalink
graph: index should be scoped to parent
Browse files Browse the repository at this point in the history
Problem: the current strategy to derive an index is
scoped to a resource globally across the graph.
Solution: instead, provide a direct index counter
for each new resource to ensure it is scoped to
the parent

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Jun 21, 2024
1 parent a3a4f03 commit a07df5a
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 25 deletions.
26 changes: 15 additions & 11 deletions src/fluence/jgf/jgf.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"fmt"
"log"
"os"
"strings"
filepath "path"
)

var (
Expand Down Expand Up @@ -80,9 +80,7 @@ func getNodePath(root, subpath string) string {
} else {
path = fmt.Sprintf("/%s/%s", root, subpath)
}
// Hack to allow for imperfection of slash placement
path = strings.ReplaceAll(path, "//", "/")
return path
return filepath.Clean(path)
}

// getContainmentPath returns a new map with containment metadata
Expand Down Expand Up @@ -110,10 +108,11 @@ func (g *FluxJGF) MakeEdge(source string, target string, contains string) {

// MakeSubnet creates a subnet for the graph
// The name is typically the ip address
func (g *FluxJGF) MakeSubnet(name string) Node {
func (g *FluxJGF) MakeSubnet(name string, index int64) Node {

// Get a resource counter for the subnet
resource := g.Resources.getCounter(name, SubnetType)
resource.Index = index
subpath := resource.NameWithIndex()
return g.makeNewNode(resource, subpath, defaultUnit, defaultSize)
}
Expand All @@ -125,8 +124,7 @@ func (g *FluxJGF) MakeSubnet(name string) Node {
func (g *FluxJGF) makeNewNode(
resource ResourceCount,
subpath, unit string,
size int64,
) Node {
size int64) Node {

// A subnet comes directly under the cluster, which is the parent
newNode := Node{
Expand Down Expand Up @@ -164,10 +162,11 @@ func (g *FluxJGF) makeNewNode(
}

// MakeNode creates a new node for the graph
func (g *FluxJGF) MakeNode(name, subpath string) Node {
func (g *FluxJGF) MakeNode(name, subpath string, index int64) Node {

// Get a resource counter for the node, which is under the subnet
resource := g.Resources.getCounter(name, NodeType)
resource.Index = index

// Here the full containment path will be:
// <cluster-root>/<subnet>/<node>
Expand All @@ -176,10 +175,11 @@ func (g *FluxJGF) MakeNode(name, subpath string) Node {
}

// MakeCore creates a core for the graph
func (g *FluxJGF) MakeCore(name, subpath string) Node {
func (g *FluxJGF) MakeCore(name, subpath string, index int64) Node {

// A core is located at the subnet->node->core
resource := g.Resources.getCounter(name, CoreType)
resource.Index = index

// Here the full containment path will be:
// <cluster-root>/<subnet>/<node>/<core>
Expand All @@ -189,13 +189,16 @@ func (g *FluxJGF) MakeCore(name, subpath string) Node {

// MakeMemory creates memory for the graph
// Flux doesn't understand memory? Not sure if this is doing anything
func (g *FluxJGF) MakeMemory(name, subpath string, size int64) Node {
func (g *FluxJGF) MakeMemory(
name, subpath string,
size, index int64) Node {

// unit is assumed to be MB
unit := "MB"

// A core is located at the subnet->node->core
resource := g.Resources.getCounter(name, MemoryType)
resource.Index = index

// Here the full containment path will be:
// <cluster-root>/<subnet>/<node>/<memory>
Expand All @@ -204,10 +207,11 @@ func (g *FluxJGF) MakeMemory(name, subpath string, size int64) Node {
}

// MakeGPU makes a gpu for the graph
func (g *FluxJGF) MakeGPU(name, subpath string, size int64) Node {
func (g *FluxJGF) MakeGPU(name, subpath string, size, index int64) Node {

// Get a resource counter for the gpu, which is under the subnet->node->gpu
resource := g.Resources.getCounter(name, GPUType)
resource.Index = index

// Here the full containment path will be:
// <cluster-root>/<subnet>/<node>
Expand Down
14 changes: 7 additions & 7 deletions src/fluence/jgf/jgf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func TestNewFluxJGF(t *testing.T) {
fmt.Println(out)

// Add subnets to it
subnetNodeA := fluxgraph.MakeSubnet("east")
subnetNodeB := fluxgraph.MakeSubnet("west")
subnetNodeA := fluxgraph.MakeSubnet("east", 0)
subnetNodeB := fluxgraph.MakeSubnet("west", 1)
fluxgraph.MakeBidirectionalEdge(clusterNode.Id, subnetNodeA.Id)
fluxgraph.MakeBidirectionalEdge(clusterNode.Id, subnetNodeB.Id)

Expand All @@ -44,8 +44,8 @@ func TestNewFluxJGF(t *testing.T) {
fmt.Println(out)

// Add some nodes!
computeNodeA := fluxgraph.MakeNode("node", subnetNodeA.Metadata.Name)
computeNodeB := fluxgraph.MakeNode("node", subnetNodeB.Metadata.Name)
computeNodeA := fluxgraph.MakeNode("node", subnetNodeA.Metadata.Name, 0)
computeNodeB := fluxgraph.MakeNode("node", subnetNodeB.Metadata.Name, 1)
fluxgraph.MakeBidirectionalEdge(subnetNodeA.Id, computeNodeA.Id)
fluxgraph.MakeBidirectionalEdge(subnetNodeB.Id, computeNodeB.Id)

Expand All @@ -57,15 +57,15 @@ func TestNewFluxJGF(t *testing.T) {

// Add a GPU to one, and cores to the other
subpath := fmt.Sprintf("%s/%s", subnetNodeA.Metadata.Name, computeNodeA.Metadata.Name)
gpuNodeA := fluxgraph.MakeGPU(NvidiaGPU, subpath, 1)
gpuNodeA := fluxgraph.MakeGPU(NvidiaGPU, subpath, 1, 0)
fluxgraph.MakeBidirectionalEdge(computeNodeA.Id, gpuNodeA.Id)

subpath = fmt.Sprintf("%s/%s", subnetNodeB.Metadata.Name, computeNodeB.Metadata.Name)
coreNode := fluxgraph.MakeCore(CoreType, subpath)
coreNode := fluxgraph.MakeCore(CoreType, subpath, 0)
fluxgraph.MakeBidirectionalEdge(computeNodeB.Id, coreNode.Id)

// Finally, add some memory to the second compute node
memoryNode := fluxgraph.MakeMemory(MemoryType, subpath, 1<<10)
memoryNode := fluxgraph.MakeMemory(MemoryType, subpath, 1<<10, 0)
fluxgraph.MakeBidirectionalEdge(computeNodeA.Id, memoryNode.Id)

out, err = fluxgraph.ToJson()
Expand Down
5 changes: 4 additions & 1 deletion src/fluence/jgf/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ func (r *ResourceCounter) NextResourceIndex(resourceType string) int64 {
}

// getCounter returns the counter context for a specific resource type
func (r *ResourceCounter) getCounter(resourceName string, resourceType string) ResourceCount {
func (r *ResourceCounter) getCounter(
resourceName string,
resourceType string,
) ResourceCount {
resourceCount := ResourceCount{
Index: r.NextResourceIndex(resourceName),
Type: resourceType,
Expand Down
14 changes: 8 additions & 6 deletions src/fluence/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ func CreateInClusterJGF(filename string, skipLabel string) error {
// Keep a lookup of subnet nodes in case we see one twice
// We don't want to create a new entity for it in the graph
subnetLookup := map[string]jgf.Node{}
var subnetCounter int64 = 0

for _, node := range nodes.Items {
for nodeCount, node := range nodes.Items {

// We should not be scheduling to the control plane
_, ok := node.Labels[controlPlaneLabel]
Expand Down Expand Up @@ -155,7 +156,8 @@ func CreateInClusterJGF(filename string, skipLabel string) error {
subnetNode, exists := subnetLookup[subnetName]
if !exists {
// Build the subnet according to topology.kubernetes.io/zone label
subnetNode = fluxgraph.MakeSubnet(subnetName)
subnetNode = fluxgraph.MakeSubnet(subnetName, subnetCounter)
subnetCounter += 1

// This is one example of bidirectional, I won't document in
// all following occurrences but this is what the function does
Expand Down Expand Up @@ -192,7 +194,7 @@ func CreateInClusterJGF(filename string, skipLabel string) error {
// TODO possibly look at pod resources vs. node.Status.Allocatable
// Make the compute node, which is a child of the subnet
// The parameters here are the node name, and the parent path
computeNode := fluxgraph.MakeNode(node.Name, subnetNode.Metadata.Name)
computeNode := fluxgraph.MakeNode(node.Name, subnetNode.Metadata.Name, int64(nodeCount))

// [subnet] -> contains -> [compute node]
fluxgraph.MakeBidirectionalEdge(subnetNode.Id, computeNode.Id)
Expand All @@ -206,7 +208,7 @@ func CreateInClusterJGF(filename string, skipLabel string) error {
subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name)

// TODO: can this size be greater than 1?
gpuNode := fluxgraph.MakeGPU(jgf.NvidiaGPU, subpath, 1)
gpuNode := fluxgraph.MakeGPU(jgf.NvidiaGPU, subpath, 1, int64(index))

// [compute] -> contains -> [gpu]
fluxgraph.MakeBidirectionalEdge(computeNode.Id, gpuNode.Id)
Expand All @@ -217,15 +219,15 @@ func CreateInClusterJGF(filename string, skipLabel string) error {
// Here is where we are adding cores
for index := 0; index < int(availCpu); index++ {
subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name)
coreNode := fluxgraph.MakeCore(jgf.CoreType, subpath)
coreNode := fluxgraph.MakeCore(jgf.CoreType, subpath, int64(index))
fluxgraph.MakeBidirectionalEdge(computeNode.Id, coreNode.Id)
}

// Here is where we are adding memory
fractionMem := availMem >> 30
for i := 0; i < int(fractionMem); i++ {
subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name)
memoryNode := fluxgraph.MakeMemory(jgf.MemoryType, subpath, 1<<10)
memoryNode := fluxgraph.MakeMemory(jgf.MemoryType, subpath, 1<<10, int64(i))
fluxgraph.MakeBidirectionalEdge(computeNode.Id, memoryNode.Id)
}
}
Expand Down

0 comments on commit a07df5a

Please sign in to comment.