Skip to content

Commit

Permalink
chore: backport latest VMs pool and HasInstance() implementations and…
Browse files Browse the repository at this point in the history
… resolve inconsistencies in config and unit tests (#7202)
  • Loading branch information
comtalyst authored Aug 26, 2024
1 parent 60a4ee5 commit aa12b49
Show file tree
Hide file tree
Showing 13 changed files with 436 additions and 124 deletions.
52 changes: 38 additions & 14 deletions cluster-autoscaler/cloudprovider/azure/azure_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,26 +207,32 @@ func (m *azureCache) regenerate() error {
return nil
}

// fetchAzureResources retrieves and updates the cached Azure resources.
//
// This function performs the following:
// - Fetches and updates the list of Virtual Machine Scale Sets (VMSS) in the specified resource group.
// - Fetches and updates the list of Virtual Machines (VMs) and identifies the node pools they belong to.
// - Maintains a set of VMs pools and VMSS resources which helps the Cluster Autoscaler (CAS) operate on mixed node pools.
//
// Returns an error if any of the Azure API calls fail.
func (m *azureCache) fetchAzureResources() error {
m.mutex.Lock()
defer m.mutex.Unlock()

// fetch all the resources since CAS may be operating on mixed nodepools
// including both VMSS and VMs pools
// NOTE: this lists virtual machine scale sets, not virtual machine
// scale set instances
vmssResult, err := m.fetchScaleSets()
if err == nil {
m.scaleSets = vmssResult
} else {
if err != nil {
return err
}

m.scaleSets = vmssResult
vmResult, vmsPoolSet, err := m.fetchVirtualMachines()
if err == nil {
m.virtualMachines = vmResult
m.vmsPoolSet = vmsPoolSet
} else {
if err != nil {
return err
}
// we fetch both sets of resources since CAS may operate on mixed nodepools
m.virtualMachines = vmResult
m.vmsPoolSet = vmsPoolSet

return nil
}
Expand Down Expand Up @@ -263,7 +269,6 @@ func (m *azureCache) fetchVirtualMachines() (map[string][]compute.VirtualMachine
if vmPoolName == nil {
vmPoolName = tags[legacyAgentpoolNameTag]
}

if vmPoolName == nil {
continue
}
Expand All @@ -276,8 +281,8 @@ func (m *azureCache) fetchVirtualMachines() (map[string][]compute.VirtualMachine
}

// nodes from vms pool will have tag "aks-managed-agentpool-type" set to "VirtualMachines"
if agnetpoolType := tags[agentpoolTypeTag]; agnetpoolType != nil {
if strings.EqualFold(to.String(agnetpoolType), vmsPoolType) {
if agentpoolType := tags[agentpoolTypeTag]; agentpoolType != nil {
if strings.EqualFold(to.String(agentpoolType), vmsPoolType) {
vmsPoolSet[to.String(vmPoolName)] = struct{}{}
}
}
Expand Down Expand Up @@ -314,7 +319,6 @@ func (m *azureCache) Register(nodeGroup cloudprovider.NodeGroup) bool {
// Node group is already registered and min/max size haven't changed, no action required.
return false
}

m.registeredNodeGroups[i] = nodeGroup
klog.V(4).Infof("Node group %q updated", nodeGroup.Id())
m.invalidateUnownedInstanceCache()
Expand All @@ -323,6 +327,7 @@ func (m *azureCache) Register(nodeGroup cloudprovider.NodeGroup) bool {
}

klog.V(4).Infof("Registering Node Group %q", nodeGroup.Id())

m.registeredNodeGroups = append(m.registeredNodeGroups, nodeGroup)
m.invalidateUnownedInstanceCache()
return true
Expand Down Expand Up @@ -391,6 +396,25 @@ func (m *azureCache) getAutoscalingOptions(ref azureRef) map[string]string {
return m.autoscalingOptions[ref]
}

// HasInstance returns if a given instance exists in the azure cache
func (m *azureCache) HasInstance(providerID string) (bool, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
resourceID, err := convertResourceGroupNameToLower(providerID)
if err != nil {
// Most likely an invalid resource id, we should return an error
// most of these shouldn't make it here do to higher level
// validation in the HasInstance azure.cloudprovider function
return false, err
}

if m.getInstanceFromCache(resourceID) != nil {
return true, nil
}
// couldn't find instance in the cache, assume it's deleted
return false, cloudprovider.ErrNotImplemented
}

// FindForInstance returns node group of the given Instance
func (m *azureCache) FindForInstance(instance *azureRef, vmType string) (cloudprovider.NodeGroup, error) {
vmsPoolSet := m.getVMsPoolSet()
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/cloudprovider/azure/azure_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func newAgentpoolClientWithConfig(subscriptionID string, cred azcore.TokenCreden
return nil, fmt.Errorf("failed to init cluster agent pools client: %w", err)
}

klog.V(10).Infof("Successfully created agent pool client with ARMBaseURLForAPClient")
klog.V(10).Infof("Successfully created agent pool client with ARMBaseURL")
return agentPoolsClient, nil
}

Expand Down
32 changes: 29 additions & 3 deletions cluster-autoscaler/cloudprovider/azure/azure_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ limitations under the License.
package azure

import (
"fmt"
"io"
"os"
"strings"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -106,6 +108,12 @@ func (azure *AzureCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovid
klog.V(6).Infof("Skipping the search for node group for the node '%s' because it has no spec.ProviderID", node.ObjectMeta.Name)
return nil, nil
}

if !strings.HasPrefix(node.Spec.ProviderID, "azure://") {
klog.V(6).Infof("Wrong azure ProviderID for node %v, skipped", node.Name)
return nil, nil
}

klog.V(6).Infof("Searching for node group for the node: %s\n", node.Spec.ProviderID)
ref := &azureRef{
Name: node.Spec.ProviderID,
Expand All @@ -115,9 +123,27 @@ func (azure *AzureCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovid
return azure.azureManager.GetNodeGroupForInstance(ref)
}

// HasInstance returns whether a given node has a corresponding instance in this cloud provider
func (azure *AzureCloudProvider) HasInstance(*apiv1.Node) (bool, error) {
return true, cloudprovider.ErrNotImplemented
// HasInstance returns whether a given node has a corresponding instance in this cloud provider.
//
// Used to prevent undercount of existing VMs (taint-based overcount of deleted VMs),
// and so should not return false, nil (no instance) if uncertain; return error instead.
// (Think "has instance for sure, else error".) Returning an error causes fallback to taint-based
// determination; use ErrNotImplemented for silent fallback, any other error will be logged.
//
// Expected behavior (should work for VMSS Uniform/Flex, and VMs):
// - exists : return true, nil
// - !exists : return *, ErrNotImplemented (could use custom error for autoscaled nodes)
// - unimplemented case : return *, ErrNotImplemented
// - any other error : return *, error
func (azure *AzureCloudProvider) HasInstance(node *apiv1.Node) (bool, error) {
if node.Spec.ProviderID == "" {
return false, fmt.Errorf("ProviderID for node: %s is empty, skipped", node.Name)
}

if !strings.HasPrefix(node.Spec.ProviderID, "azure://") {
return false, fmt.Errorf("invalid azure ProviderID prefix for node: %s, skipped", node.Name)
}
return azure.azureManager.azureCache.HasInstance(node.Spec.ProviderID)
}

// Pricing returns pricing model for this cloud provider or error if not available.
Expand Down
Loading

0 comments on commit aa12b49

Please sign in to comment.