From 33234ae68e86fa77844ca08da4e12eb0a08c911f Mon Sep 17 00:00:00 2001 From: Katherine Xu Date: Wed, 29 Jan 2025 21:58:58 +0000 Subject: [PATCH] Revert "Make nodepool concurrent ops scale better (#12488)" This reverts commit 1dbed42bb9ea904991e432c09a28fffca929452e. --- .../services/container/node_config.go.tmpl | 215 ++++++++++-------- .../resource_container_node_pool.go.tmpl | 182 +++++++-------- 2 files changed, 213 insertions(+), 184 deletions(-) diff --git a/mmv1/third_party/terraform/services/container/node_config.go.tmpl b/mmv1/third_party/terraform/services/container/node_config.go.tmpl index 31600735f615..c690e99b96e8 100644 --- a/mmv1/third_party/terraform/services/container/node_config.go.tmpl +++ b/mmv1/third_party/terraform/services/container/node_config.go.tmpl @@ -2053,14 +2053,10 @@ func flattenHostMaintenancePolicy(c *container.HostMaintenancePolicy) []map[stri // node pool updates in `resource_container_cluster` func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Config, nodePoolInfo *NodePoolInformation, prefix, name string, timeout time.Duration) error { - // Cluster write-lock will be acquired when createOpF is called to make operation creations sequential within - // the cluster, and read-lock will be acquired when waitOpF is called to allow concurrent operation. - // This is to resolve the bottleneck of large number of operations being created at the same time. - clusterLockKey := nodePoolInfo.clusterLockKey() - // Nodepool write-lock will be acquired when calling creaetOpF and waitOpF. + // Nodepool write-lock will be acquired when update function is called. npLockKey := nodePoolInfo.nodePoolLockKey(name) - userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) + userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) if err != nil { return err } @@ -2079,15 +2075,17 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf }, } - createOpF := func() (*container.Operation, error) { + updateF := func() error { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2095,8 +2093,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated logging_variant for node pool %s", name) @@ -2106,22 +2104,24 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf if d.HasChange(prefix + "node_config.0.containerd_config") { if _, ok := d.GetOk(prefix + "node_config.0.containerd_config"); ok { req := &container.UpdateNodePoolRequest{ - Name: name, + Name: name, ContainerdConfig: expandContainerdConfig(d.Get(prefix + "node_config.0.containerd_config")), } if req.ContainerdConfig == nil { req.ContainerdConfig = &container.ContainerdConfig{} req.ForceSendFields = []string{"ContainerdConfig"} } - createOpF := func() (*container.Operation, error) { + updateF := func() error { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2129,8 +2129,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated containerd_config for node pool %s", name) @@ -2158,15 +2158,17 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.StoragePools = storagePools } - createOpF := func() (*container.Operation, error) { + updateF := func() error { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2174,7 +2176,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { return err } log.Printf("[INFO] Updated disk disk_size_gb/disk_type/machine_type/storage_pools for Node Pool %s", d.Id()) @@ -2212,15 +2214,17 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.Taints = ntaints } - createOpF := func() (*container.Operation, error) { + updateF := func() error { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2228,8 +2232,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated taints for Node Pool %s", d.Id()) } @@ -2262,15 +2266,17 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.Tags = ntags } - createOpF := func() (*container.Operation, error) { + updateF := func() error { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2278,8 +2284,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated tags for node pool %s", name) } @@ -2302,15 +2308,17 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.ResourceManagerTags = rmTags } - createOpF := func() (*container.Operation, error) { + updateF := func() error { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2318,7 +2326,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { return err } log.Printf("[INFO] Updated resource manager tags for node pool %s", name) @@ -2336,15 +2344,17 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } } - createOpF := func() (*container.Operation, error) { + updateF := func() error { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2353,8 +2363,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } // Call update serially. - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated resource labels for node pool %s", name) @@ -2372,15 +2382,17 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } } - createOpF := func() (*container.Operation, error) { + updateF := func() error { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2389,8 +2401,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } // Call update serially. - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated labels for node pool %s", name) @@ -2404,23 +2416,25 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf }, } - createOpF := func() (*container.Operation, error) { + updateF := func() error { clusterUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.Update(nodePoolInfo.parent(), req) if config.UserProjectOverride { clusterUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterUpdateCall.Do() - } + op, err := clusterUpdateCall.Do() + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated image type in Node Pool %s", d.Id()) } @@ -2435,15 +2449,18 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.WorkloadMetadataConfig = &container.WorkloadMetadataConfig{} req.ForceSendFields = []string{"WorkloadMetadataConfig"} } - createOpF := func() (*container.Operation, error) { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) + updateF := func() error { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() - waitOpF := func(op *container.Operation) error { + if err != nil { + return err + } + + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2451,8 +2468,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated workload_metadata_config for node pool %s", name) } @@ -2465,15 +2482,17 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf Enabled: gcfsEnabled, }, } - createOpF := func() (*container.Operation, error) { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) + updateF := func() error { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2481,8 +2500,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated gcfs_config for node pool %s", name) @@ -2498,15 +2517,17 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.KubeletConfig = &container.NodeKubeletConfig{} req.ForceSendFields = []string{"KubeletConfig"} } - createOpF := func() (*container.Operation, error) { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) + updateF := func() error { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2514,8 +2535,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated kubelet_config for node pool %s", name) @@ -2530,15 +2551,17 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.LinuxNodeConfig = &container.LinuxNodeConfig{} req.ForceSendFields = []string{"LinuxNodeConfig"} } - createOpF := func() (*container.Operation, error) { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) + updateF := func() error { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2546,8 +2569,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated linux_node_config for node pool %s", name) @@ -2559,19 +2582,21 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } if v, ok := d.GetOk(prefix + "node_config.0.fast_socket"); ok { fastSocket := v.([]interface{})[0].(map[string]interface{}) - req.FastSocket = &container.FastSocket{ + req.FastSocket = &container.FastSocket{ Enabled: fastSocket["enabled"].(bool), } } - createOpF := func() (*container.Operation, error) { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) + updateF := func() error { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2579,8 +2604,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated fast_socket for node pool %s", name) diff --git a/mmv1/third_party/terraform/services/container/resource_container_node_pool.go.tmpl b/mmv1/third_party/terraform/services/container/resource_container_node_pool.go.tmpl index dd2cd3a64db1..ccda8717798e 100644 --- a/mmv1/third_party/terraform/services/container/resource_container_node_pool.go.tmpl +++ b/mmv1/third_party/terraform/services/container/resource_container_node_pool.go.tmpl @@ -568,6 +568,11 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e return err } + // Acquire read-lock on cluster. + clusterLockKey := nodePoolInfo.clusterLockKey() + transport_tpg.MutexStore.RLock(clusterLockKey) + defer transport_tpg.MutexStore.RUnlock(clusterLockKey) + // Acquire write-lock on nodepool. npLockKey := nodePoolInfo.nodePoolLockKey(nodePool.Name) transport_tpg.MutexStore.Lock(npLockKey) @@ -580,9 +585,6 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e timeout := d.Timeout(schema.TimeoutCreate) startTime := time.Now() - clusterLockKey := nodePoolInfo.clusterLockKey() - transport_tpg.MutexStore.RLock(clusterLockKey) - // we attempt to prefetch the node pool to make sure it doesn't exist before creation var id = fmt.Sprintf("projects/%s/locations/%s/clusters/%s/nodePools/%s", nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, nodePool.Name) name := getNodePoolName(id) @@ -597,16 +599,11 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e // refreshed on the next call to apply. d.SetId(id) } else if err == nil { - transport_tpg.MutexStore.RUnlock(clusterLockKey) return fmt.Errorf("resource - %s - already exists", id) } - transport_tpg.MutexStore.RUnlock(clusterLockKey) var operation *container.Operation err = retry.Retry(timeout, func() *retry.RetryError { - transport_tpg.MutexStore.Lock(clusterLockKey) - defer transport_tpg.MutexStore.Unlock(clusterLockKey) - clusterNodePoolsCreateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Create(nodePoolInfo.parent(), req) if config.UserProjectOverride { clusterNodePoolsCreateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) @@ -625,8 +622,6 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e } return nil }) - transport_tpg.MutexStore.RLock(clusterLockKey) - defer transport_tpg.MutexStore.RUnlock(clusterLockKey) if err != nil { return fmt.Errorf("error creating NodePool: %s", err) } @@ -801,7 +796,10 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e } } + // Acquire read-lock on cluster. clusterLockKey := nodePoolInfo.clusterLockKey() + transport_tpg.MutexStore.RLock(clusterLockKey) + defer transport_tpg.MutexStore.RUnlock(clusterLockKey) // Acquire write-lock on nodepool. npLockKey := nodePoolInfo.nodePoolLockKey(name) @@ -813,8 +811,6 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e var operation *container.Operation err = retry.Retry(timeout, func() *retry.RetryError { - transport_tpg.MutexStore.Lock(clusterLockKey) - defer transport_tpg.MutexStore.Unlock(clusterLockKey) clusterNodePoolsDeleteCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Delete(nodePoolInfo.fullyQualifiedName(name)) if config.UserProjectOverride { clusterNodePoolsDeleteCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) @@ -834,8 +830,6 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e return nil }) - transport_tpg.MutexStore.RLock(clusterLockKey) - defer transport_tpg.MutexStore.RUnlock(clusterLockKey) if err != nil { return fmt.Errorf("Error deleting NodePool: %s", err) @@ -1352,20 +1346,22 @@ func expandNodeNetworkConfig(v interface{}) *container.NodeNetworkConfig { return nnc } + func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *NodePoolInformation, prefix string, timeout time.Duration) error { config := meta.(*transport_tpg.Config) name := d.Get(prefix + "name").(string) - userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) + userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) if err != nil { return err } - // Cluster write-lock will be acquired when createOpF is called, and read-lock will be acquired when waitOpF is - // called. + // Acquire read-lock on cluster. clusterLockKey := nodePoolInfo.clusterLockKey() + transport_tpg.MutexStore.RLock(clusterLockKey) + defer transport_tpg.MutexStore.RUnlock(clusterLockKey) - // Nodepool write-lock will be acquired when calling createOpF and waitOpF. + // Nodepool write-lock will be acquired when update function is called. npLockKey := nodePoolInfo.nodePoolLockKey(name) if d.HasChange(prefix + "autoscaling") { @@ -1393,23 +1389,25 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node Update: update, } - createOpF := func() (*container.Operation, error) { + updateF := func() error { clusterUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.Update(nodePoolInfo.parent(), req) if config.UserProjectOverride { clusterUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterUpdateCall.Do() - } + op, err := clusterUpdateCall.Do() + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated autoscaling in Node Pool %s", d.Id()) } @@ -1423,22 +1421,25 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node req := &container.SetNodePoolSizeRequest{ NodeCount: newSize, } - createOpF := func() (*container.Operation, error) { - clusterNodePoolsSetSizeCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.SetSize(nodePoolInfo.fullyQualifiedName(name), req) + updateF := func() error { + clusterNodePoolsSetSizeCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.SetSize(nodePoolInfo.fullyQualifiedName(name),req) if config.UserProjectOverride { clusterNodePoolsSetSizeCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsSetSizeCall.Do() - } + op, err := clusterNodePoolsSetSizeCall.Do() - waitOpF := func(op *container.Operation) error { + if err != nil { + return err + } + + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool size", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] GKE node pool %s size has been updated to %d", name, newSize) } @@ -1455,22 +1456,25 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node Management: management, } - createOpF := func() (*container.Operation, error) { - clusterNodePoolsSetManagementCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.SetManagement(nodePoolInfo.fullyQualifiedName(name), req) + updateF := func() error { + clusterNodePoolsSetManagementCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.SetManagement(nodePoolInfo.fullyQualifiedName(name),req) if config.UserProjectOverride { clusterNodePoolsSetManagementCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsSetManagementCall.Do() - } + op, err := clusterNodePoolsSetManagementCall.Do() - waitOpF := func(op *container.Operation) error { + if err != nil { + return err + } + + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool management", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated management in Node Pool %s", name) } @@ -1480,21 +1484,24 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node NodePoolId: name, NodeVersion: d.Get(prefix + "version").(string), } - createOpF := func() (*container.Operation, error) { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) + updateF := func() error { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() + + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool version", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated version in Node Pool %s", name) } @@ -1503,20 +1510,23 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node req := &container.UpdateNodePoolRequest{ Locations: tpgresource.ConvertStringSet(d.Get(prefix + "node_locations").(*schema.Set)), } - createOpF := func() (*container.Operation, error) { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) + updateF := func() error { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() - waitOpF := func(op *container.Operation) error { + if err != nil { + return err + } + + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool node locations", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated node locations in Node Pool %s", name) } @@ -1564,7 +1574,7 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node if v, ok := blueGreenSettingsConfig["standard_rollout_policy"]; ok && len(v.([]interface{})) > 0 { standardRolloutPolicy := &container.StandardRolloutPolicy{} - if standardRolloutPolicyConfig, ok := v.([]interface{})[0].(map[string]interface{}); ok { + if standardRolloutPolicyConfig, ok := v.([]interface{})[0].(map[string]interface{}); ok { standardRolloutPolicy.BatchSoakDuration = standardRolloutPolicyConfig["batch_soak_duration"].(string) if v, ok := standardRolloutPolicyConfig["batch_node_count"]; ok { standardRolloutPolicy.BatchNodeCount = int64(v.(int)) @@ -1581,38 +1591,44 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node req := &container.UpdateNodePoolRequest{ UpgradeSettings: upgradeSettings, } - createOpF := func() (*container.Operation, error) { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) + updateF := func() error { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() + + if err != nil { + return err + } - waitOpF := func(op *container.Operation) error { + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool upgrade settings", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated upgrade settings in Node Pool %s", name) } if d.HasChange(prefix + "network_config") { - if d.HasChange(prefix+"network_config.0.enable_private_nodes") || d.HasChange(prefix+"network_config.0.network_performance_config") { + if d.HasChange(prefix + "network_config.0.enable_private_nodes") || d.HasChange(prefix + "network_config.0.network_performance_config") { req := &container.UpdateNodePoolRequest{ - NodePoolId: name, + NodePoolId: name, NodeNetworkConfig: expandNodeNetworkConfig(d.Get(prefix + "network_config")), } - createOpF := func() (*container.Operation, error) { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) + updateF := func() error { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - return clusterNodePoolsUpdateCall.Do() - } + op, err := clusterNodePoolsUpdateCall.Do() - waitOpF := func(op *container.Operation) error { + if err != nil { + return err + } + + // Wait until it's updated return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -1620,8 +1636,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + return err } log.Printf("[INFO] Updated network_config for node pool %s", name) @@ -1672,27 +1688,15 @@ func containerNodePoolAwaitRestingState(config *transport_tpg.Config, name, proj return state, err } -// Retries an operation while the canonical error code is FAILED_PRECONDTION or RESOURCE_EXHAUSTED which indicates -// there is an incompatible operation already running on the cluster or there are the number of allowed concurrent -// operations running on the cluster. These errors can be safely retried until the incompatible operation completes, -// and the newly requested operation can begin. -// The npLockKey is held throughout createOpFunc and waitOpFunc. -// The clusterLockKey write-lock is held during createOpFunc to make operation creations sequential within the cluster, -// and clusterLockKey read-lock is held during waitOpFunc to allow concurrency on a cluster. -func retryWhileIncompatibleOperation(timeout time.Duration, npLockKey string, clusterLockKey string, createOpFunc func() (*container.Operation, error), waitOpFunc func(*container.Operation) error) error { - f := func() error { - transport_tpg.MutexStore.Lock(clusterLockKey) - op, err := createOpFunc() - transport_tpg.MutexStore.Unlock(clusterLockKey) - if err != nil { - return err - } - transport_tpg.MutexStore.RLock(clusterLockKey) - defer transport_tpg.MutexStore.RUnlock(clusterLockKey) - return waitOpFunc(op) - } +// Retries an operation while the canonical error code is FAILED_PRECONDTION +// or RESOURCE_EXHAUSTED which indicates there is an incompatible operation +// already running on the cluster or there are the number of allowed +// concurrent operations running on the cluster. These errors can be safely +// retried until the incompatible operation completes, and the newly +// requested operation can begin. +func retryWhileIncompatibleOperation(timeout time.Duration, lockKey string, f func() error) error { return retry.Retry(timeout, func() *retry.RetryError { - if err := transport_tpg.LockedCall(npLockKey, f); err != nil { + if err := transport_tpg.LockedCall(lockKey, f); err != nil { if tpgresource.IsFailedPreconditionError(err) || tpgresource.IsQuotaError(err) { return retry.RetryableError(err) }