Skip to content

Commit

Permalink
karmada-metrics-adapter: reduce memory usage
Browse files Browse the repository at this point in the history
When there is a large amount of pod usage in the member cluster, metrics-adapter will consume a lot of memory. The reason is that it caches all the information of all pods in the cluster. However, we don't need all this information, so we trim some of the information to reduce memory usage.

Signed-off-by: chaunceyjiang <[email protected]>
  • Loading branch information
chaunceyjiang committed Apr 3, 2024
1 parent 57c1989 commit 23bdd83
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 61 deletions.
2 changes: 1 addition & 1 deletion pkg/metricsadapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type MetricsAdapter struct {
func NewMetricsAdapter(controller *MetricsController, customMetricsAdapterServerOptions *options.CustomMetricsAdapterServerOptions) *MetricsAdapter {
adapter := &MetricsAdapter{}
adapter.CustomMetricsAdapterServerOptions = customMetricsAdapterServerOptions
adapter.ResourceMetricsProvider = provider.NewResourceMetricsProvider(controller.ClusterLister, controller.InformerManager)
adapter.ResourceMetricsProvider = provider.NewResourceMetricsProvider(controller.ClusterLister, controller.TypedInformerManager, controller.InformerManager)
customProvider := provider.MakeCustomMetricsProvider(controller.ClusterLister, controller.MultiClusterDiscovery)
externalProvider := provider.MakeExternalMetricsProvider()
adapter.WithCustomMetrics(customProvider)
Expand Down
23 changes: 19 additions & 4 deletions pkg/metricsadapter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/karmada-io/karmada/pkg/metricsadapter/provider"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/typedmanager"
"github.com/karmada-io/karmada/pkg/util/gclient"
)

Expand All @@ -50,6 +51,7 @@ type MetricsController struct {
InformerFactory informerfactory.SharedInformerFactory
ClusterLister clusterlister.ClusterLister
InformerManager genericmanager.MultiClusterInformerManager
TypedInformerManager typedmanager.MultiClusterInformerManager
MultiClusterDiscovery multiclient.MultiClusterDiscoveryInterface
queue workqueue.RateLimitingInterface
restConfig *rest.Config
Expand All @@ -63,6 +65,7 @@ func NewMetricsController(restConfig *rest.Config, factory informerfactory.Share
ClusterLister: clusterLister,
MultiClusterDiscovery: multiclient.NewMultiClusterDiscoveryClient(clusterLister, kubeFactory),
InformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
restConfig: restConfig,
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
Name: "metrics-adapter",
Expand Down Expand Up @@ -147,6 +150,7 @@ func (m *MetricsController) handleClusters() bool {
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("try to stop cluster informer %s", clusterName)
m.TypedInformerManager.Stop(clusterName)
m.InformerManager.Stop(clusterName)
m.MultiClusterDiscovery.Remove(clusterName)
return true
Expand All @@ -156,21 +160,27 @@ func (m *MetricsController) handleClusters() bool {

if !cls.DeletionTimestamp.IsZero() {
klog.Infof("try to stop cluster informer %s", clusterName)
m.TypedInformerManager.Stop(clusterName)
m.InformerManager.Stop(clusterName)
m.MultiClusterDiscovery.Remove(clusterName)
return true
}

if !util.IsClusterReady(&cls.Status) {
klog.Warningf("cluster %s is notReady try to stop this cluster informer", clusterName)
m.TypedInformerManager.Stop(clusterName)
m.InformerManager.Stop(clusterName)
m.MultiClusterDiscovery.Remove(clusterName)
return false
}

if !m.InformerManager.IsManagerExist(clusterName) {
if !m.TypedInformerManager.IsManagerExist(clusterName) {
klog.Info("Try to build informer manager for cluster ", clusterName)
controlPlaneClient := gclient.NewForConfigOrDie(m.restConfig)
clusterClient, err := util.NewClusterClientSet(clusterName, controlPlaneClient, nil)
if err != nil {
return false
}
clusterDynamicClient, err := util.NewClusterDynamicClientSet(clusterName, controlPlaneClient)
if err != nil {
return false
Expand All @@ -181,18 +191,23 @@ func (m *MetricsController) handleClusters() bool {
klog.Warningf("unable to access cluster %s, Error: %+v", clusterName, err)
return true
}
_ = m.TypedInformerManager.ForCluster(clusterName, clusterClient.KubeClient, 0)
_ = m.InformerManager.ForCluster(clusterName, clusterDynamicClient.DynamicClientSet, 0)
}
err = m.MultiClusterDiscovery.Set(clusterName)
if err != nil {
klog.Warningf("failed to build discoveryClient for cluster(%s), Error: %+v", clusterName, err)
return true
}
sci := m.InformerManager.GetSingleClusterManager(clusterName)

typedSci := m.TypedInformerManager.GetSingleClusterManager(clusterName)
// Just trigger the informer to work
_ = sci.Lister(provider.PodsGVR)
_ = sci.Lister(provider.NodesGVR)
_, _ = typedSci.Lister(provider.PodsGVR)
_, _ = typedSci.Lister(provider.NodesGVR)

typedSci.Start()
_ = typedSci.WaitForCacheSync()
sci := m.InformerManager.GetSingleClusterManager(clusterName)
sci.Start()
_ = sci.WaitForCacheSync()

Expand Down
132 changes: 76 additions & 56 deletions pkg/metricsadapter/provider/resourcemetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/metrics/pkg/apis/metrics"
Expand All @@ -36,6 +37,7 @@ import (
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/typedmanager"
"github.com/karmada-io/karmada/pkg/util/helper"
)

Expand All @@ -59,25 +61,27 @@ var (
NodesGVR = corev1.SchemeGroupVersion.WithResource("nodes")
)

type queryResourceFromClustersFunc func(sci genericmanager.SingleClusterInformerManager, clusterName string) error
type queryResourceFromClustersFunc func(sci typedmanager.SingleClusterInformerManager, clusterName string) error
type queryMetricsFromClustersFunc func(sci genericmanager.SingleClusterInformerManager, clusterName string) (interface{}, error)

// ResourceMetricsProvider is a resource metrics provider, to provide cpu/memory metrics
type ResourceMetricsProvider struct {
PodLister *PodLister
NodeLister *NodeLister

clusterLister clusterlister.ClusterLister
informerManager genericmanager.MultiClusterInformerManager
clusterLister clusterlister.ClusterLister
informerManager genericmanager.MultiClusterInformerManager
typedInformerManager typedmanager.MultiClusterInformerManager
}

// NewResourceMetricsProvider creates a new resource metrics provider
func NewResourceMetricsProvider(clusterLister clusterlister.ClusterLister, informerManager genericmanager.MultiClusterInformerManager) *ResourceMetricsProvider {
func NewResourceMetricsProvider(clusterLister clusterlister.ClusterLister, typedInformerManager typedmanager.MultiClusterInformerManager, informerManager genericmanager.MultiClusterInformerManager) *ResourceMetricsProvider {
return &ResourceMetricsProvider{
clusterLister: clusterLister,
informerManager: informerManager,
PodLister: NewPodLister(clusterLister, informerManager),
NodeLister: NewNodeLister(clusterLister, informerManager),
clusterLister: clusterLister,
informerManager: informerManager,
typedInformerManager: typedInformerManager,
PodLister: NewPodLister(clusterLister, typedInformerManager),
NodeLister: NewNodeLister(clusterLister, typedInformerManager),
}
}

Expand All @@ -93,7 +97,7 @@ func (r *ResourceMetricsProvider) getMetricsParallel(resourceFunc queryResourceF
// step 1. Find out the target clusters in lister cache
var targetClusters []string
for _, cluster := range clusters {
sci := r.informerManager.GetSingleClusterManager(cluster.Name)
sci := r.typedInformerManager.GetSingleClusterManager(cluster.Name)
if sci == nil {
klog.Errorf("Failed to get cluster(%s) manager", cluster.Name)
continue
Expand Down Expand Up @@ -161,8 +165,13 @@ func (r *ResourceMetricsProvider) getMetricsParallel(resourceFunc queryResourceF

// queryPodMetricsByName queries metrics by pod name from target clusters
func (r *ResourceMetricsProvider) queryPodMetricsByName(name, namespace string) ([]metrics.PodMetrics, error) {
resourceQueryFunc := func(sci genericmanager.SingleClusterInformerManager, _ string) error {
_, err := sci.Lister(PodsGVR).ByNamespace(namespace).Get(name)
resourceQueryFunc := func(sci typedmanager.SingleClusterInformerManager, _ string) error {
podInterface, err := sci.Lister(PodsGVR)
if err != nil {
return err
}
lister := podInterface.(v1.PodLister)
_, err = lister.Pods(namespace).Get(name)
return err
}
metricsQueryFunc := func(sci genericmanager.SingleClusterInformerManager, clusterName string) (interface{}, error) {
Expand Down Expand Up @@ -199,9 +208,13 @@ func (r *ResourceMetricsProvider) queryPodMetricsBySelector(selector, namespace
klog.Errorf("Failed to parse label selector: %v", err)
return nil, err
}

resourceQueryFunc := func(sci genericmanager.SingleClusterInformerManager, clusterName string) error {
pods, err := sci.Lister(PodsGVR).ByNamespace(namespace).List(labelSelector)
resourceQueryFunc := func(sci typedmanager.SingleClusterInformerManager, clusterName string) error {
podInterface, err := sci.Lister(PodsGVR)
if err != nil {
return err
}
lister := podInterface.(v1.PodLister)
pods, err := lister.Pods(namespace).List(labelSelector)
if err != nil {
klog.Errorf("Failed to list pods in cluster(%s): %v", clusterName, err)
return err
Expand Down Expand Up @@ -245,8 +258,13 @@ func (r *ResourceMetricsProvider) queryPodMetricsBySelector(selector, namespace

// queryNodeMetricsByName queries metrics by node name from target clusters
func (r *ResourceMetricsProvider) queryNodeMetricsByName(name string) ([]metrics.NodeMetrics, error) {
resourceQueryFunc := func(sci genericmanager.SingleClusterInformerManager, _ string) error {
_, err := sci.Lister(NodesGVR).Get(name)
resourceQueryFunc := func(sci typedmanager.SingleClusterInformerManager, _ string) error {
nodeInterface, err := sci.Lister(PodsGVR)
if err != nil {
return err
}
lister := nodeInterface.(v1.NodeLister)
_, err = lister.Get(name)
return err
}
metricsQueryFunc := func(sci genericmanager.SingleClusterInformerManager, clusterName string) (interface{}, error) {
Expand Down Expand Up @@ -282,9 +300,13 @@ func (r *ResourceMetricsProvider) queryNodeMetricsBySelector(selector string) ([
klog.Errorf("Failed to parse label selector: %v", err)
return nil, err
}

resourceQueryFunc := func(sci genericmanager.SingleClusterInformerManager, clusterName string) error {
nodes, err := sci.Lister(NodesGVR).List(labelSelector)
resourceQueryFunc := func(sci typedmanager.SingleClusterInformerManager, clusterName string) error {
nodeInterface, err := sci.Lister(NodesGVR)
if err != nil {
return err
}
lister := nodeInterface.(v1.NodeLister)
nodes, err := lister.List(labelSelector)
if err != nil {
klog.Errorf("Failed to list pods in cluster(%s): %v", clusterName, err)
return err
Expand Down Expand Up @@ -370,11 +392,11 @@ func (r *ResourceMetricsProvider) GetNodeMetrics(nodes ...*corev1.Node) ([]metri
type PodLister struct {
namespaceSpecified string
clusterLister clusterlister.ClusterLister
informerManager genericmanager.MultiClusterInformerManager
informerManager typedmanager.MultiClusterInformerManager
}

// NewPodLister creates an internal new PodLister
func NewPodLister(clusterLister clusterlister.ClusterLister, informerManager genericmanager.MultiClusterInformerManager) *PodLister {
func NewPodLister(clusterLister clusterlister.ClusterLister, informerManager typedmanager.MultiClusterInformerManager) *PodLister {
return &PodLister{
clusterLister: clusterLister,
informerManager: informerManager,
Expand All @@ -396,19 +418,19 @@ func (p *PodLister) List(selector labels.Selector) (ret []runtime.Object, err er
klog.Errorf("Failed to get SingleClusterInformerManager for cluster(%s)", cluster.Name)
continue
}
pods, err := sci.Lister(PodsGVR).ByNamespace(p.namespaceSpecified).List(selector)
lister, err := sci.Lister(PodsGVR)
if err != nil {
klog.Errorf("Failed to get lister for cluster(%s): %v", cluster.Name, err)
continue
}
podLister := lister.(v1.PodLister)
pods, err := podLister.Pods(p.namespaceSpecified).List(selector)
if err != nil {
klog.Errorf("Failed to list pods from cluster(%s) in namespace(%s): %v", cluster.Name, p.namespaceSpecified, err)
return nil, err
}
for _, pod := range pods {
podTyped := &corev1.Pod{}
err = helper.ConvertToTypedObject(pod, podTyped)
if err != nil {
klog.Errorf("Failed to convert to typed object: %v", err)
return nil, err
}
podPartial := p.convertToPodPartialData(podTyped, selector.String(), true)
for i := range pods {
podPartial := p.convertToPodPartialData(pods[i], selector.String(), true)
ret = append(ret, podPartial)
}
}
Expand Down Expand Up @@ -454,7 +476,13 @@ func (p *PodLister) Get(name string) (runtime.Object, error) {
klog.Errorf("Failed to get SingleClusterInformerManager for cluster(%s)", cluster.Name)
continue
}
pod, err := sci.Lister(PodsGVR).ByNamespace(p.namespaceSpecified).Get(name)
sciLister, err := sci.Lister(PodsGVR)
if err != nil {
klog.Errorf("Failed to get lister for cluster(%s): %v", cluster.Name, err)
continue
}
podLister := sciLister.(v1.PodLister)
pod, err := podLister.Pods(p.namespaceSpecified).Get(name)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("Failed to get pod from clsuster(%s) in namespace(%s): %v", cluster.Name, p.namespaceSpecified, err)
Expand All @@ -466,13 +494,7 @@ func (p *PodLister) Get(name string) (runtime.Object, error) {
err := fmt.Errorf("the pod(%s) found in more than one clusters", name)
return nil, errors.NewConflict(PodsGVR.GroupResource(), name, err)
}
podTyped := &corev1.Pod{}
err = helper.ConvertToTypedObject(pod, podTyped)
if err != nil {
klog.Errorf("Failed to convert to typed object: %v", err)
return nil, err
}
podPartial = p.convertToPodPartialData(podTyped, "", false)
podPartial = p.convertToPodPartialData(pod, "", false)
}

if podPartial != nil {
Expand All @@ -497,11 +519,11 @@ func (p *PodLister) ByNamespace(namespace string) cache.GenericNamespaceLister {
// NodeLister is an internal lister for nodes
type NodeLister struct {
clusterLister clusterlister.ClusterLister
informerManager genericmanager.MultiClusterInformerManager
informerManager typedmanager.MultiClusterInformerManager
}

// NewNodeLister creates an internal new NodeLister
func NewNodeLister(clusterLister clusterlister.ClusterLister, informerManager genericmanager.MultiClusterInformerManager) *NodeLister {
func NewNodeLister(clusterLister clusterlister.ClusterLister, informerManager typedmanager.MultiClusterInformerManager) *NodeLister {
return &NodeLister{
clusterLister: clusterLister,
informerManager: informerManager,
Expand All @@ -523,22 +545,21 @@ func (n *NodeLister) List(selector labels.Selector) (ret []*corev1.Node, err err
klog.Errorf("Failed to get SingleClusterInformerManager for cluster(%s)", cluster.Name)
continue
}
nodes, err := sci.Lister(NodesGVR).List(selector)
nodeInterface, err := sci.Lister(NodesGVR)
if err != nil {
klog.Errorf("Failed to get lister for cluster(%s): %v", cluster.Name, err)
continue
}
nodes, err := nodeInterface.(v1.NodeLister).List(selector)
if err != nil {
klog.Errorf("Failed to list nodes from cluster(%s): %v", cluster.Name, err)
return nil, err
}
for index := range nodes {
nodeTyped := &corev1.Node{}
err = helper.ConvertToTypedObject(nodes[index], nodeTyped)
if err != nil {
klog.Errorf("Failed to convert to typed object: %v", err)
return nil, err
}
nodeTyped := nodes[index]
if nodeTyped.Annotations == nil {
nodeTyped.Annotations = map[string]string{}
}

// If user sets this annotation, we need to reset it.
nodeTyped.Annotations[labelSelectorAnnotationInternal] = selector.String()
ret = append(ret, nodeTyped)
Expand All @@ -564,7 +585,12 @@ func (n *NodeLister) Get(name string) (*corev1.Node, error) {
klog.Errorf("Failed to get SingleClusterInformerManager for cluster(%s)", cluster.Name)
continue
}
node, err := sci.Lister(NodesGVR).Get(name)
sciLister, err := sci.Lister(NodesGVR)
if err != nil {
klog.Errorf("Failed to get lister for cluster(%s): %v", cluster.Name, err)
continue
}
node, err := sciLister.(v1.NodeLister).Get(name)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("Failed to get node from cluster(%s):%v", cluster.Name, err)
Expand All @@ -577,16 +603,10 @@ func (n *NodeLister) Get(name string) (*corev1.Node, error) {
return nil, errors.NewConflict(NodesGVR.GroupResource(), name, err)
}

nodeTyped = &corev1.Node{}
err = helper.ConvertToTypedObject(node, nodeTyped)
if err != nil {
klog.Errorf("Failed to convert to typed object: %v", err)
return nil, err
}
nodeTyped = node
if nodeTyped.Annotations == nil {
nodeTyped.Annotations = map[string]string{}
}

// If user sets this annotation, we need to remove it to avoid parsing wrong next.
delete(nodeTyped.Annotations, labelSelectorAnnotationInternal)
}
Expand Down

0 comments on commit 23bdd83

Please sign in to comment.