Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ORM): add orm podResources server #453

Merged
merged 5 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/katalyst-agent/app/agent/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

const (
ORMAgent = "katalyst-agent-orm"
ORMAgent = orm.ORMAgentName
)

func InitORM(agentCtx *GenericContext, conf *config.Configuration, _ interface{}, _ string) (bool, Component, error) {
Expand Down
35 changes: 25 additions & 10 deletions cmd/katalyst-agent/app/options/orm/orm_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,26 @@ import (
)

type GenericORMPluginOptions struct {
ORMRconcilePeriod time.Duration
ORMResourceNamesMap map[string]string
ORMPodNotifyChanLen int
TopologyPolicyName string
NumericAlignResources []string
ORMRconcilePeriod time.Duration
ORMResourceNamesMap map[string]string
ORMPodNotifyChanLen int
TopologyPolicyName string
NumericAlignResources []string
ORMPodResourcesSocket string
ORMDevicesProvider string
ORMKubeletPodResourcesEndpoints []string
}

func NewGenericORMPluginOptions() *GenericORMPluginOptions {
return &GenericORMPluginOptions{
ORMRconcilePeriod: time.Second * 5,
ORMResourceNamesMap: map[string]string{},
ORMPodNotifyChanLen: 10,
TopologyPolicyName: "none",
NumericAlignResources: []string{"cpu", "memory"},
ORMRconcilePeriod: time.Second * 5,
ORMResourceNamesMap: map[string]string{},
ORMPodNotifyChanLen: 10,
TopologyPolicyName: "",
NumericAlignResources: []string{"cpu", "memory"},
ORMPodResourcesSocket: "unix:/var/lib/katalyst/pod-resources/kubelet.sock",
ORMDevicesProvider: "",
ORMKubeletPodResourcesEndpoints: []string{"/var/lib/kubelet/pod-resources/kubelet.sock"},
}
}

Expand All @@ -58,6 +64,12 @@ func (o *GenericORMPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) {
o.TopologyPolicyName, "topology merge policy name used by ORM")
fs.StringSliceVar(&o.NumericAlignResources, "numeric-align-resources", o.NumericAlignResources,
"resources which should be aligned in numeric topology policy")
fs.StringVar(&o.ORMPodResourcesSocket, "orm-pod-resources-socket", o.ORMPodResourcesSocket,
"socket of ORM pod resource api, default 'unix:/var/lib/katalyst/pod-resources/kubelet.sock'")
fs.StringVar(&o.ORMDevicesProvider, "orm-devices-provider", o.ORMDevicesProvider,
"devices provider provides devices resources and allocatable for ORM podResources api")
fs.StringSliceVar(&o.ORMKubeletPodResourcesEndpoints, "orm-kubelet-pod-resources-endpoints", o.ORMKubeletPodResourcesEndpoints,
"kubelet podResources endpoints for ORM kubelet devices provider")
}

func (o *GenericORMPluginOptions) ApplyTo(conf *ormconfig.GenericORMConfiguration) error {
Expand All @@ -66,6 +78,9 @@ func (o *GenericORMPluginOptions) ApplyTo(conf *ormconfig.GenericORMConfiguratio
conf.ORMPodNotifyChanLen = o.ORMPodNotifyChanLen
conf.TopologyPolicyName = o.TopologyPolicyName
conf.NumericAlignResources = o.NumericAlignResources
conf.ORMPodResourcesSocket = o.ORMPodResourcesSocket
conf.ORMDevicesProvider = o.ORMDevicesProvider
conf.ORMKubeletPodResourcesEndpoints = o.ORMKubeletPodResourcesEndpoints

return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/evanphx/json-patch v5.6.0+incompatible
github.com/fsnotify/fsnotify v1.5.4
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/google/cadvisor v0.44.2
github.com/google/uuid v1.3.0
Expand Down
115 changes: 115 additions & 0 deletions pkg/agent/orm/deviceprovider/kubelet/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubelet

import (
"context"
"encoding/json"
"time"

"google.golang.org/grpc"
"k8s.io/klog/v2"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"

podresourcesserver "github.com/kubewharf/katalyst-core/pkg/agent/orm/server/podresources"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/kubelet/podresources"
)

const (
podResourcesClientTimeout = 10 * time.Second
podResourcesClientMaxMsgSize = 1024 * 1024 * 16
)

// Provider provide devices resource by kubelet v1 podResources api
type Provider struct {
client podresourcesapi.PodResourcesListerClient
endpoints []string
conn *grpc.ClientConn
}

func NewProvider(endpoints []string, getClientFunc podresources.GetClientFunc) (podresourcesserver.DevicesProvider, error) {
klog.V(5).Infof("new kubelet devices provider, endpoints: %v", endpoints)

p := &Provider{
endpoints: endpoints,
}

var err error

p.client, p.conn, err = getClientFunc(
general.GetOneExistPath(endpoints),
podResourcesClientTimeout,
podResourcesClientMaxMsgSize)
if err != nil {
klog.Error(err)
return nil, err
}

return p, nil
}

func (p *Provider) GetDevices() []*podresourcesapi.PodResources {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()

response, err := p.client.List(ctx, &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
klog.Errorf("list resources from kubelet fail: %v", err)
return nil
}
if response == nil {
klog.Error("list resources from kubelet, get nil response without err")
return nil
}
if response.GetPodResources() == nil {
klog.Error("list resources from kubelet, get nil podResources without err")
return nil
}

if klog.V(6).Enabled() {
str, _ := json.Marshal(response)
klog.Infof("GetDevices: %v", string(str))
}

return response.PodResources
}

func (p *Provider) GetAllocatableDevices() []*podresourcesapi.ContainerDevices {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()

response, err := p.client.GetAllocatableResources(ctx, &podresourcesapi.AllocatableResourcesRequest{})
if err != nil {
klog.Errorf("GetAllocatableResources from kubelet fail: %v", err)
return nil
}
if response == nil {
klog.Error("GetAllocatableResources from kubelet, get nil response without err")
return nil
}
if response.GetDevices() == nil {
klog.Error("GetAllocatableResources from kubelet, get nil response without err")
return nil
}

if klog.V(6).Enabled() {
str, _ := json.Marshal(response)
klog.Infof("GetAllocatableDevices: %v", str)
}
return response.GetDevices()
}
136 changes: 136 additions & 0 deletions pkg/agent/orm/deviceprovider/kubelet/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubelet

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
v1 "k8s.io/kubelet/pkg/apis/podresources/v1"
)

func TestGetDevice(t *testing.T) {
t.Parallel()

mockProvider := &Provider{
client: &mockPodResourcesListerClient{
ListPodResourcesResponse: &v1.ListPodResourcesResponse{
PodResources: []*v1.PodResources{},
},
},
}

resp := mockProvider.GetDevices()
assert.NotNil(t, resp)

nilPodResourcesProvider := &Provider{
client: &mockPodResourcesListerClient{
ListPodResourcesResponse: &v1.ListPodResourcesResponse{},
},
}
resp = nilPodResourcesProvider.GetDevices()
assert.Nil(t, resp)

nilRespProvider := &Provider{
client: &mockPodResourcesListerClient{
ListPodResourcesResponse: nil,
},
}
resp = nilRespProvider.GetDevices()
assert.Nil(t, resp)

errPodResourceProvider := &Provider{
client: &errPodResourceListerClient{},
}
resp = errPodResourceProvider.GetDevices()
assert.Nil(t, resp)
}

func TestGetAllocatableDevices(t *testing.T) {
t.Parallel()

mockProvider := &Provider{
client: &mockPodResourcesListerClient{
AllocatableResourcesResponse: &v1.AllocatableResourcesResponse{
Devices: []*v1.ContainerDevices{},
},
},
}
resp := mockProvider.GetAllocatableDevices()
assert.NotNil(t, resp)

nilRespProvider := &Provider{
client: &mockPodResourcesListerClient{
AllocatableResourcesResponse: nil,
},
}
resp = nilRespProvider.GetAllocatableDevices()
assert.Nil(t, resp)

nilDeviceProvider := &Provider{
client: &mockPodResourcesListerClient{
AllocatableResourcesResponse: &v1.AllocatableResourcesResponse{},
},
}
resp = nilDeviceProvider.GetAllocatableDevices()
assert.Nil(t, resp)

errProvider := &Provider{
client: &errPodResourceListerClient{},
}
resp = errProvider.GetAllocatableDevices()
assert.Nil(t, resp)
}

func TestNewProvider(t *testing.T) {
t.Parallel()

p, err := NewProvider([]string{}, getMockClientFunc)
assert.NoError(t, err)
assert.NotNil(t, p)
}

type mockPodResourcesListerClient struct {
*v1.ListPodResourcesResponse
*v1.AllocatableResourcesResponse
}

func (f *mockPodResourcesListerClient) List(ctx context.Context, in *v1.ListPodResourcesRequest, opts ...grpc.CallOption) (*v1.ListPodResourcesResponse, error) {
return f.ListPodResourcesResponse, nil
}

func (f *mockPodResourcesListerClient) GetAllocatableResources(ctx context.Context, in *v1.AllocatableResourcesRequest, opts ...grpc.CallOption) (*v1.AllocatableResourcesResponse, error) {
return f.AllocatableResourcesResponse, nil
}

type errPodResourceListerClient struct{}

func (e *errPodResourceListerClient) List(ctx context.Context, in *v1.ListPodResourcesRequest, opts ...grpc.CallOption) (*v1.ListPodResourcesResponse, error) {
return nil, fmt.Errorf("err")
}

func (e *errPodResourceListerClient) GetAllocatableResources(ctx context.Context, in *v1.AllocatableResourcesRequest, opts ...grpc.CallOption) (*v1.AllocatableResourcesResponse, error) {
return nil, fmt.Errorf("err")
}

func getMockClientFunc(socket string, connectionTimeout time.Duration, maxMsgSize int) (v1.PodResourcesListerClient, *grpc.ClientConn, error) {
return &mockPodResourcesListerClient{}, nil, nil
}
20 changes: 20 additions & 0 deletions pkg/agent/orm/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Endpoint interface {
IsStopped() bool
StopGracePeriodExpired() bool
GetResourcePluginOptions(ctx context.Context, in *pluginapi.Empty, opts ...grpc.CallOption) (*pluginapi.ResourcePluginOptions, error)
GetTopologyAwareAllocatableResources(c context.Context, request *pluginapi.GetTopologyAwareAllocatableResourcesRequest) (*pluginapi.GetTopologyAwareAllocatableResourcesResponse, error)
GetTopologyAwareResources(c context.Context, request *pluginapi.GetTopologyAwareResourcesRequest) (*pluginapi.GetTopologyAwareResourcesResponse, error)
}

type EndpointInfo struct {
Expand Down Expand Up @@ -161,6 +163,24 @@ func (e *EndpointImpl) GetTopologyHints(c context.Context, resourceRequest *plug
return e.client.GetTopologyHints(ctx, resourceRequest)
}

func (e *EndpointImpl) GetTopologyAwareAllocatableResources(c context.Context, request *pluginapi.GetTopologyAwareAllocatableResourcesRequest) (*pluginapi.GetTopologyAwareAllocatableResourcesResponse, error) {
if e.IsStopped() {
return nil, fmt.Errorf(errEndpointStopped, e)
}
ctx, cancel := context.WithTimeout(c, pluginapi.KubeletResourcePluginGetTopologyAwareAllocatableResourcesRPCTimeoutInSecs*time.Second)
defer cancel()
return e.client.GetTopologyAwareAllocatableResources(ctx, request)
}

func (e *EndpointImpl) GetTopologyAwareResources(c context.Context, request *pluginapi.GetTopologyAwareResourcesRequest) (*pluginapi.GetTopologyAwareResourcesResponse, error) {
if e.IsStopped() {
return nil, fmt.Errorf(errEndpointStopped, e)
}
ctx, cancel := context.WithTimeout(c, pluginapi.KubeletResourcePluginGetTopologyAwareResourcesRPCTimeoutInSecs*time.Second)
defer cancel()
return e.client.GetTopologyAwareResources(ctx, request)
}

// dial establishes the gRPC communication with the registered resource plugin. https://godoc.org/google.golang.org/grpc#Dial
func dial(unixSocketPath string) (pluginapi.ResourcePluginClient, *grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down
Loading
Loading