diff --git a/src/fluence/jgf/jgf.go b/src/fluence/jgf/jgf.go index 6600fa4..ba4d268 100644 --- a/src/fluence/jgf/jgf.go +++ b/src/fluence/jgf/jgf.go @@ -20,7 +20,7 @@ import ( "fmt" "log" "os" - "strings" + filepath "path" ) var ( @@ -80,9 +80,7 @@ func getNodePath(root, subpath string) string { } else { path = fmt.Sprintf("/%s/%s", root, subpath) } - // Hack to allow for imperfection of slash placement - path = strings.ReplaceAll(path, "//", "/") - return path + return filepath.Clean(path) } // getContainmentPath returns a new map with containment metadata @@ -110,10 +108,11 @@ func (g *FluxJGF) MakeEdge(source string, target string, contains string) { // MakeSubnet creates a subnet for the graph // The name is typically the ip address -func (g *FluxJGF) MakeSubnet(name string) Node { +func (g *FluxJGF) MakeSubnet(name string, index int64) Node { // Get a resource counter for the subnet resource := g.Resources.getCounter(name, SubnetType) + resource.Index = index subpath := resource.NameWithIndex() return g.makeNewNode(resource, subpath, defaultUnit, defaultSize) } @@ -125,8 +124,7 @@ func (g *FluxJGF) MakeSubnet(name string) Node { func (g *FluxJGF) makeNewNode( resource ResourceCount, subpath, unit string, - size int64, -) Node { + size int64) Node { // A subnet comes directly under the cluster, which is the parent newNode := Node{ @@ -164,10 +162,11 @@ func (g *FluxJGF) makeNewNode( } // MakeNode creates a new node for the graph -func (g *FluxJGF) MakeNode(name, subpath string) Node { +func (g *FluxJGF) MakeNode(name, subpath string, index int64) Node { // Get a resource counter for the node, which is under the subnet resource := g.Resources.getCounter(name, NodeType) + resource.Index = index // Here the full containment path will be: // // @@ -176,10 +175,11 @@ func (g *FluxJGF) MakeNode(name, subpath string) Node { } // MakeCore creates a core for the graph -func (g *FluxJGF) MakeCore(name, subpath string) Node { +func (g *FluxJGF) MakeCore(name, subpath string, index int64) Node { // A core is located at the subnet->node->core resource := g.Resources.getCounter(name, CoreType) + resource.Index = index // Here the full containment path will be: // /// @@ -189,13 +189,16 @@ func (g *FluxJGF) MakeCore(name, subpath string) Node { // MakeMemory creates memory for the graph // Flux doesn't understand memory? Not sure if this is doing anything -func (g *FluxJGF) MakeMemory(name, subpath string, size int64) Node { +func (g *FluxJGF) MakeMemory( + name, subpath string, + size, index int64) Node { // unit is assumed to be MB unit := "MB" // A core is located at the subnet->node->core resource := g.Resources.getCounter(name, MemoryType) + resource.Index = index // Here the full containment path will be: // /// @@ -204,10 +207,11 @@ func (g *FluxJGF) MakeMemory(name, subpath string, size int64) Node { } // MakeGPU makes a gpu for the graph -func (g *FluxJGF) MakeGPU(name, subpath string, size int64) Node { +func (g *FluxJGF) MakeGPU(name, subpath string, size, index int64) Node { // Get a resource counter for the gpu, which is under the subnet->node->gpu resource := g.Resources.getCounter(name, GPUType) + resource.Index = index // Here the full containment path will be: // // diff --git a/src/fluence/jgf/jgf_test.go b/src/fluence/jgf/jgf_test.go index c705a48..1d1a596 100644 --- a/src/fluence/jgf/jgf_test.go +++ b/src/fluence/jgf/jgf_test.go @@ -32,8 +32,8 @@ func TestNewFluxJGF(t *testing.T) { fmt.Println(out) // Add subnets to it - subnetNodeA := fluxgraph.MakeSubnet("east") - subnetNodeB := fluxgraph.MakeSubnet("west") + subnetNodeA := fluxgraph.MakeSubnet("east", 0) + subnetNodeB := fluxgraph.MakeSubnet("west", 1) fluxgraph.MakeBidirectionalEdge(clusterNode.Id, subnetNodeA.Id) fluxgraph.MakeBidirectionalEdge(clusterNode.Id, subnetNodeB.Id) @@ -44,8 +44,8 @@ func TestNewFluxJGF(t *testing.T) { fmt.Println(out) // Add some nodes! - computeNodeA := fluxgraph.MakeNode("node", subnetNodeA.Metadata.Name) - computeNodeB := fluxgraph.MakeNode("node", subnetNodeB.Metadata.Name) + computeNodeA := fluxgraph.MakeNode("node", subnetNodeA.Metadata.Name, 0) + computeNodeB := fluxgraph.MakeNode("node", subnetNodeB.Metadata.Name, 1) fluxgraph.MakeBidirectionalEdge(subnetNodeA.Id, computeNodeA.Id) fluxgraph.MakeBidirectionalEdge(subnetNodeB.Id, computeNodeB.Id) @@ -57,15 +57,15 @@ func TestNewFluxJGF(t *testing.T) { // Add a GPU to one, and cores to the other subpath := fmt.Sprintf("%s/%s", subnetNodeA.Metadata.Name, computeNodeA.Metadata.Name) - gpuNodeA := fluxgraph.MakeGPU(NvidiaGPU, subpath, 1) + gpuNodeA := fluxgraph.MakeGPU(NvidiaGPU, subpath, 1, 0) fluxgraph.MakeBidirectionalEdge(computeNodeA.Id, gpuNodeA.Id) subpath = fmt.Sprintf("%s/%s", subnetNodeB.Metadata.Name, computeNodeB.Metadata.Name) - coreNode := fluxgraph.MakeCore(CoreType, subpath) + coreNode := fluxgraph.MakeCore(CoreType, subpath, 0) fluxgraph.MakeBidirectionalEdge(computeNodeB.Id, coreNode.Id) // Finally, add some memory to the second compute node - memoryNode := fluxgraph.MakeMemory(MemoryType, subpath, 1<<10) + memoryNode := fluxgraph.MakeMemory(MemoryType, subpath, 1<<10, 0) fluxgraph.MakeBidirectionalEdge(computeNodeA.Id, memoryNode.Id) out, err = fluxgraph.ToJson() diff --git a/src/fluence/jgf/types.go b/src/fluence/jgf/types.go index 41d8fde..8359c28 100644 --- a/src/fluence/jgf/types.go +++ b/src/fluence/jgf/types.go @@ -131,7 +131,10 @@ func (r *ResourceCounter) NextResourceIndex(resourceType string) int64 { } // getCounter returns the counter context for a specific resource type -func (r *ResourceCounter) getCounter(resourceName string, resourceType string) ResourceCount { +func (r *ResourceCounter) getCounter( + resourceName string, + resourceType string, +) ResourceCount { resourceCount := ResourceCount{ Index: r.NextResourceIndex(resourceName), Type: resourceType, diff --git a/src/fluence/utils/utils.go b/src/fluence/utils/utils.go index 25ddb85..19fadf8 100644 --- a/src/fluence/utils/utils.go +++ b/src/fluence/utils/utils.go @@ -114,8 +114,9 @@ func CreateInClusterJGF(filename string, skipLabel string) error { // Keep a lookup of subnet nodes in case we see one twice // We don't want to create a new entity for it in the graph subnetLookup := map[string]jgf.Node{} + var subnetCounter int64 = 0 - for _, node := range nodes.Items { + for nodeCount, node := range nodes.Items { // We should not be scheduling to the control plane _, ok := node.Labels[controlPlaneLabel] @@ -155,7 +156,8 @@ func CreateInClusterJGF(filename string, skipLabel string) error { subnetNode, exists := subnetLookup[subnetName] if !exists { // Build the subnet according to topology.kubernetes.io/zone label - subnetNode = fluxgraph.MakeSubnet(subnetName) + subnetNode = fluxgraph.MakeSubnet(subnetName, subnetCounter) + subnetCounter += 1 // This is one example of bidirectional, I won't document in // all following occurrences but this is what the function does @@ -192,7 +194,7 @@ func CreateInClusterJGF(filename string, skipLabel string) error { // TODO possibly look at pod resources vs. node.Status.Allocatable // Make the compute node, which is a child of the subnet // The parameters here are the node name, and the parent path - computeNode := fluxgraph.MakeNode(node.Name, subnetNode.Metadata.Name) + computeNode := fluxgraph.MakeNode(node.Name, subnetNode.Metadata.Name, int64(nodeCount)) // [subnet] -> contains -> [compute node] fluxgraph.MakeBidirectionalEdge(subnetNode.Id, computeNode.Id) @@ -206,7 +208,7 @@ func CreateInClusterJGF(filename string, skipLabel string) error { subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name) // TODO: can this size be greater than 1? - gpuNode := fluxgraph.MakeGPU(jgf.NvidiaGPU, subpath, 1) + gpuNode := fluxgraph.MakeGPU(jgf.NvidiaGPU, subpath, 1, int64(index)) // [compute] -> contains -> [gpu] fluxgraph.MakeBidirectionalEdge(computeNode.Id, gpuNode.Id) @@ -217,7 +219,7 @@ func CreateInClusterJGF(filename string, skipLabel string) error { // Here is where we are adding cores for index := 0; index < int(availCpu); index++ { subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name) - coreNode := fluxgraph.MakeCore(jgf.CoreType, subpath) + coreNode := fluxgraph.MakeCore(jgf.CoreType, subpath, int64(index)) fluxgraph.MakeBidirectionalEdge(computeNode.Id, coreNode.Id) } @@ -225,7 +227,7 @@ func CreateInClusterJGF(filename string, skipLabel string) error { fractionMem := availMem >> 30 for i := 0; i < int(fractionMem); i++ { subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name) - memoryNode := fluxgraph.MakeMemory(jgf.MemoryType, subpath, 1<<10) + memoryNode := fluxgraph.MakeMemory(jgf.MemoryType, subpath, 1<<10, int64(i)) fluxgraph.MakeBidirectionalEdge(computeNode.Id, memoryNode.Id) } }