Skip to content

Commit

Permalink
feat: add topology manager for ORM
Browse files Browse the repository at this point in the history
  • Loading branch information
WangZzzhe committed Jan 8, 2024
1 parent d182b2f commit 44ba1c2
Show file tree
Hide file tree
Showing 25 changed files with 5,718 additions and 38 deletions.
22 changes: 16 additions & 6 deletions cmd/katalyst-agent/app/options/orm/orm_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@ import (
)

type GenericORMPluginOptions struct {
ORMRconcilePeriod time.Duration
ORMResourceNamesMap map[string]string
ORMPodNotifyChanLen int
ORMRconcilePeriod time.Duration
ORMResourceNamesMap map[string]string
ORMPodNotifyChanLen int
TopologyPolicyName string
NumericAlignResources []string
}

func NewGenericORMPluginOptions() *GenericORMPluginOptions {
return &GenericORMPluginOptions{
ORMRconcilePeriod: time.Second * 5,
ORMResourceNamesMap: map[string]string{},
ORMPodNotifyChanLen: 10,
ORMRconcilePeriod: time.Second * 5,
ORMResourceNamesMap: map[string]string{},
ORMPodNotifyChanLen: 10,
TopologyPolicyName: "none",
NumericAlignResources: []string{"cpu", "memory"},
}
}

Expand All @@ -50,12 +54,18 @@ func (o *GenericORMPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) {
"will also be allocated by [cpu] and [memory] QRM plugins")
fs.IntVar(&o.ORMPodNotifyChanLen, "orm-pod-notify-chan-len",
o.ORMPodNotifyChanLen, "length of pod addition and movement notifying channel")
fs.StringVar(&o.TopologyPolicyName, "topology-policy-name",
o.TopologyPolicyName, "topology merge policy name used by ORM")
fs.StringSliceVar(&o.NumericAlignResources, "numeric-align-resources", o.NumericAlignResources,
"resources which should be aligned in numeric topology policy")
}

func (o *GenericORMPluginOptions) ApplyTo(conf *ormconfig.GenericORMConfiguration) error {
conf.ORMRconcilePeriod = o.ORMRconcilePeriod
conf.ORMResourceNamesMap = o.ORMResourceNamesMap
conf.ORMPodNotifyChanLen = o.ORMPodNotifyChanLen
conf.TopologyPolicyName = o.TopologyPolicyName
conf.NumericAlignResources = o.NumericAlignResources

return nil
}
8 changes: 4 additions & 4 deletions pkg/agent/resourcemanager/outofband/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,21 @@ import (
func TestCheckpoint(t *testing.T) {
t.Parallel()

checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp")
checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp/checkpoint/")
assert.NoError(t, err)

m := &ManagerImpl{
socketdir: "/tmp",
socketdir: "/tmp/checkpoint",
endpoints: make(map[string]endpoint.EndpointInfo),
podResources: newPodResourcesChk(),
checkpointManager: checkpointManager,
}
defer func() {
_ = os.Remove("/tmp/kubelet_qrm_checkpoint")
_ = os.Remove("/tmp/checkpoint/kubelet_qrm_checkpoint")
}()

file := m.checkpointFile()
assert.Equal(t, file, "/tmp/kubelet_qrm_checkpoint")
assert.Equal(t, file, "/tmp/checkpoint/kubelet_qrm_checkpoint")

allocationInfo := generateResourceAllocationInfo()
m.podResources.insert("testPod", "testContainer", "cpu", allocationInfo)
Expand Down
13 changes: 12 additions & 1 deletion pkg/agent/resourcemanager/outofband/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
type Endpoint interface {
Stop()
Allocate(c context.Context, resourceRequest *pluginapi.ResourceRequest) (*pluginapi.ResourceAllocationResponse, error)
GetTopologyHints(c context.Context, resourceRequest *pluginapi.ResourceRequest) (*pluginapi.ResourceHintsResponse, error)
GetResourceAllocation(c context.Context, request *pluginapi.GetResourcesAllocationRequest) (*pluginapi.GetResourcesAllocationResponse, error)
RemovePod(c context.Context, removePodRequest *pluginapi.RemovePodRequest) (*pluginapi.RemovePodResponse, error)
IsStopped() bool
Expand Down Expand Up @@ -64,7 +65,7 @@ type EndpointImpl struct {
func NewEndpointImpl(socketPath, resourceName string) (*EndpointImpl, error) {
client, c, err := dial(socketPath)
if err != nil {
klog.Errorf("[qosresourcemanager] Can't create new endpoint with path %s err %v", socketPath, err)
klog.Errorf("[ORM] Can't create new endpoint with path %s err %v", socketPath, err)
return nil, err
}

Expand Down Expand Up @@ -150,6 +151,16 @@ func (e *EndpointImpl) GetResourcePluginOptions(ctx context.Context, in *plugina
return e.client.GetResourcePluginOptions(ctx, in, opts...)
}

func (e *EndpointImpl) GetTopologyHints(c context.Context, resourceRequest *pluginapi.ResourceRequest) (*pluginapi.ResourceHintsResponse, error) {
if e.IsStopped() {
return nil, fmt.Errorf(errEndpointStopped, e)
}
ctx, cancel := context.WithTimeout(c, pluginapi.KubeletResourcePluginGetTopologyHintsRPCTimeoutInSecs*time.Second)
defer cancel()

return e.client.GetTopologyHints(ctx, resourceRequest)
}

// dial establishes the gRPC communication with the registered resource plugin. https://godoc.org/google.golang.org/grpc#Dial
func dial(unixSocketPath string) (pluginapi.ResourcePluginClient, *grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down
209 changes: 191 additions & 18 deletions pkg/agent/resourcemanager/outofband/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package outofband
import (
"context"
"fmt"
"math"
"net"
"os"
"path/filepath"
Expand All @@ -38,10 +39,12 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/endpoint"
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/executor"
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/metamanager"
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/topology"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/config/generic"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/bitmask"
cgroupmgr "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager"
"github.com/kubewharf/katalyst-core/pkg/util/native"
)
Expand All @@ -59,6 +62,8 @@ type ManagerImpl struct {

metaManager *metamanager.Manager

topologyManager topology.Manager

server *grpc.Server
wg sync.WaitGroup

Expand Down Expand Up @@ -110,6 +115,14 @@ func NewManager(socketPath string, emitter metrics.MetricEmitter, metaServer *me
metaManager := metamanager.NewManager(emitter, m.podResources.pods, metaServer)
m.metaManager = metaManager

topologyManager, err := topology.NewManager(metaServer.Topology, config.TopologyPolicyName, config.NumericAlignResources)
if err != nil {
klog.Error(err)
return nil, err
}
topologyManager.AddHintProvider(m)
m.topologyManager = topologyManager

if err := m.removeContents(m.socketdir); err != nil {
err = fmt.Errorf("[ORM] Fail to clean up stale contents under %s: %v", m.socketdir, err)
klog.Error(err)
Expand Down Expand Up @@ -179,6 +192,105 @@ func (m *ManagerImpl) GetHandlerType() string {
return pluginregistration.ResourcePlugin
}

func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topology.TopologyHint {
if pod == nil || container == nil {
klog.Errorf("[ORM] GetTopologyHints got nil pod: %v or container: %v", pod, container)
return nil
}

podUID := string(pod.UID)
contName := container.Name
containerType, containerIndex, err := GetContainerTypeAndIndex(pod, container)
if err != nil {
return nil
}

resourceHints := make(map[string][]topology.TopologyHint)
for resourceObj, requestedObj := range container.Resources.Requests {
requested := int(requestedObj.Value())
resource, err := m.getMappedResourceName(string(resourceObj), container.Resources.Requests)
if err != nil {
klog.Errorf("resource %s getMappedResourceName fail: %v", string(resourceObj), err)
return nil
}

if requestedObj.IsZero() {
continue
}

allocationInfo := m.podResources.containerResource(podUID, contName, resource)
if allocationInfo != nil && allocationInfo.ResourceHints != nil && len(allocationInfo.ResourceHints.Hints) > 0 {

allocated := int(math.Ceil(allocationInfo.AllocatedQuantity))

if allocationInfo.IsScalarResource && allocated >= requested {
resourceHints[resource] = ParseListOfTopologyHints(allocationInfo.ResourceHints)
klog.Warningf("[ORM] resource %s already allocated to (pod %s/%s, container %v) with larger number than request: requested: %d, allocated: %d; not to getTopologyHints",
resource, pod.GetNamespace(), pod.GetName(), container.Name, requested, allocated)
continue
} else {
klog.Warningf("[ORM] resource %s already allocated to (pod %s/%s, container %v) with smaller number than request: requested: %d, allocated: %d; continue to getTopologyHints",
resource, pod.GetNamespace(), pod.GetName(), container.Name, requested, int(math.Ceil(allocationInfo.AllocatedQuantity)))
}
}

m.mutex.Lock()
e, ok := m.endpoints[resource]
m.mutex.Unlock()
if !ok || e.Opts == nil || !e.Opts.WithTopologyAlignment {
klog.V(5).Infof("[ORM] GetTopologyHints resource %s not supported", resource)
continue
}

resourceReq := &pluginapi.ResourceRequest{
PodUid: podUID,
PodNamespace: pod.GetNamespace(),
PodName: pod.GetName(),
ContainerName: container.Name,
ContainerType: containerType,
ContainerIndex: containerIndex,
PodRole: pod.Labels[pluginapi.PodRoleLabelKey],
PodType: pod.Annotations[pluginapi.PodTypeAnnotationKey],
Labels: maputil.CopySS(pod.Labels),
Annotations: maputil.CopySS(pod.Annotations),
// use mapped resource name in "ResourceName" to indicates which endpoint to request
ResourceName: resource,
// use original requested resource name in "ResourceRequests" in order to make plugin identity real requested resource name
ResourceRequests: map[string]float64{string(resourceObj): requestedObj.AsApproximateFloat64()},
}

resp, err := e.E.GetTopologyHints(context.Background(), resourceReq)
if err != nil {
klog.Errorf("[ORM] call GetTopologyHints of %s resource plugin for pod: %s/%s, container: %s failed with error: %v",
resource, pod.GetNamespace(), pod.GetName(), contName, err)

resourceHints[resource] = []topology.TopologyHint{}
continue
}

resourceHints[resource] = ParseListOfTopologyHints(resp.ResourceHints[resource])

klog.Infof("[ORM] GetTopologyHints for resource: %s, pod: %s/%s, container: %s, result: %+v",
resource, pod.Namespace, pod.Name, contName, resourceHints[resource])
}

return resourceHints
}

func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
if pod == nil || container == nil {
return fmt.Errorf("Allocate got nil pod: %v or container: %v", pod, container)
}

err := m.addContainer(pod, container)
if err != nil {
return err
}

err = m.syncContainer(pod, container)
return err
}

func (m *ManagerImpl) onPodAdd(podUID string) {
klog.V(5).Infof("[ORM] onPodAdd: %v", podUID)

Expand Down Expand Up @@ -240,17 +352,7 @@ func (m *ManagerImpl) processAddPod(podUID string) error {
return err
}

for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
err = m.addContainer(pod, &container)
if err != nil {
klog.Errorf("[ORM] add container fail, pod: %v, container: %v, err: %v", pod.Name, container.Name, err)
return err
}

_ = m.syncContainer(pod, &container)
}

return nil
return m.topologyManager.Admit(pod)
}

func (m *ManagerImpl) processDeletePod(podUID string) error {
Expand All @@ -271,6 +373,7 @@ func (m *ManagerImpl) processDeletePod(podUID string) error {

if allSuccess {
m.podResources.deletePod(podUID)
m.topologyManager.RemovePod(podUID)
}

return m.writeCheckpoint()
Expand All @@ -291,13 +394,33 @@ func (m *ManagerImpl) addContainer(pod *v1.Pod, container *v1.Container) error {
return nil
}

containerType, containerIndex, err := GetContainerTypeAndIndex(pod, container)
if err != nil {
return err
}

for k, v := range container.Resources.Requests {
needed := int(v.Value())
resource, err := m.getMappedResourceName(string(k), container.Resources.Requests)
if err != nil {
klog.Errorf("resource %s getMappedResourceName fail: %v", string(k), err)
return err
}

allocationInfo := m.podResources.containerResource(string(pod.UID), container.Name, resource)
if allocationInfo != nil {
allocated := int(math.Ceil(allocationInfo.AllocatedQuantity))

if allocationInfo.IsScalarResource && allocated >= needed {
klog.Infof("[ORM] resource %s already allocated to (pod %s/%s, container %v) with larger number than request: requested: %d, allocated: %d; not to allocate",
resource, pod.GetNamespace(), pod.GetName(), container.Name, needed, allocated)
continue
} else {
klog.Warningf("[ORM] resource %s already allocated to (pod %s/%s, container %v) with smaller number than request: requested: %d, allocated: %d; continue to allocate",
resource, pod.GetNamespace(), pod.GetName(), container.Name, needed, allocated)
}
}

m.mutex.Lock()
e, ok := m.endpoints[resource]
m.mutex.Unlock()
Expand All @@ -306,11 +429,6 @@ func (m *ManagerImpl) addContainer(pod *v1.Pod, container *v1.Container) error {
continue
}

containerType, containerIndex, err := GetContainerTypeAndIndex(pod, container)
if err != nil {
return err
}

resourceReq := &pluginapi.ResourceRequest{
PodUid: string(pod.UID),
PodNamespace: pod.GetNamespace(),
Expand All @@ -327,8 +445,20 @@ func (m *ManagerImpl) addContainer(pod *v1.Pod, container *v1.Container) error {
ResourceRequests: map[string]float64{resource: v.AsApproximateFloat64()},
Labels: maputil.CopySS(pod.Labels),
Annotations: maputil.CopySS(pod.Annotations),
// hint is not used in ORM but it can not be nil
Hint: &pluginapi.TopologyHint{},
}

if e.Opts != nil && e.Opts.WithTopologyAlignment {
hint := m.topologyManager.GetAffinity(string(pod.UID), container.Name, resource)

if hint.NUMANodeAffinity == nil {
klog.Warningf("[ORM] pod: %s/%s; container: %s allocate resource: %s without numa nodes affinity",
pod.Namespace, pod.Name, container.Name, resource)
} else {
klog.Warningf("[ORM] pod: %s/%s; container: %s allocate resource: %s get hint: %v from store",
pod.Namespace, pod.Name, container.Name, resource, hint)
}

resourceReq.Hint = ParseTopologyManagerHint(hint)
}

response, err := e.E.Allocate(m.ctx, resourceReq)
Expand Down Expand Up @@ -578,3 +708,46 @@ func isPodKatalystQoSLevelSystemCores(qosConfig *generic.QoSConfiguration, pod *

return qosLevel == pluginapi.KatalystQoSLevelSystemCores, nil
}

func ParseListOfTopologyHints(hintsList *pluginapi.ListOfTopologyHints) []topology.TopologyHint {
if hintsList == nil {
return nil
}

resultHints := make([]topology.TopologyHint, 0, len(hintsList.Hints))

for _, hint := range hintsList.Hints {
if hint != nil {

mask := bitmask.NewEmptyBitMask()

for _, node := range hint.Nodes {
mask.Add(int(node))
}

resultHints = append(resultHints, topology.TopologyHint{
NUMANodeAffinity: mask,
Preferred: hint.Preferred,
})
}
}

return resultHints
}

func ParseTopologyManagerHint(hint topology.TopologyHint) *pluginapi.TopologyHint {
var nodes []uint64

if hint.NUMANodeAffinity != nil {
bits := hint.NUMANodeAffinity.GetBits()

for _, node := range bits {
nodes = append(nodes, uint64(node))
}
}

return &pluginapi.TopologyHint{
Nodes: nodes,
Preferred: hint.Preferred,
}
}
Loading

0 comments on commit 44ba1c2

Please sign in to comment.