diff --git a/.changelog/12488.txt b/.changelog/12488.txt new file mode 100644 index 00000000000..558c6325550 --- /dev/null +++ b/.changelog/12488.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +container: make nodepool concurrent operations scale better +``` \ No newline at end of file diff --git a/google/services/container/node_config.go b/google/services/container/node_config.go index 90d955873fd..5f8008537bb 100644 --- a/google/services/container/node_config.go +++ b/google/services/container/node_config.go @@ -1848,7 +1848,11 @@ func flattenFastSocket(c *container.FastSocket) []map[string]interface{} { // node pool updates in `resource_container_cluster` func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Config, nodePoolInfo *NodePoolInformation, prefix, name string, timeout time.Duration) error { - // Nodepool write-lock will be acquired when update function is called. + // Cluster write-lock will be acquired when createOpF is called to make operation creations sequential within + // the cluster, and read-lock will be acquired when waitOpF is called to allow concurrent operation. + // This is to resolve the bottleneck of large number of operations being created at the same time. + clusterLockKey := nodePoolInfo.clusterLockKey() + // Nodepool write-lock will be acquired when calling creaetOpF and waitOpF. npLockKey := nodePoolInfo.nodePoolLockKey(name) userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) @@ -1870,17 +1874,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf }, } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -1888,7 +1890,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } @@ -1906,17 +1908,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.ContainerdConfig = &container.ContainerdConfig{} req.ForceSendFields = []string{"ContainerdConfig"} } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -1924,7 +1924,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } @@ -1953,17 +1953,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.StoragePools = storagePools } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -1971,7 +1969,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] Updated disk disk_size_gb/disk_type/machine_type/storage_pools for Node Pool %s", d.Id()) @@ -2009,17 +2007,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.Taints = ntaints } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2027,7 +2023,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] Updated taints for Node Pool %s", d.Id()) @@ -2061,17 +2057,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.Tags = ntags } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2079,7 +2073,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] Updated tags for node pool %s", name) @@ -2103,17 +2097,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.ResourceManagerTags = rmTags } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2121,7 +2113,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] Updated resource manager tags for node pool %s", name) @@ -2139,17 +2131,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2158,7 +2148,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } // Call update serially. - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } @@ -2177,17 +2167,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2196,7 +2184,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } // Call update serially. - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } @@ -2211,24 +2199,22 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf }, } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.Update(nodePoolInfo.parent(), req) if config.UserProjectOverride { clusterUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterUpdateCall.Do() - if err != nil { - return err - } + return clusterUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] Updated image type in Node Pool %s", d.Id()) @@ -2244,18 +2230,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.WorkloadMetadataConfig = &container.WorkloadMetadataConfig{} req.ForceSendFields = []string{"WorkloadMetadataConfig"} } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2263,7 +2246,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] Updated workload_metadata_config for node pool %s", name) @@ -2277,17 +2260,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf Enabled: gcfsEnabled, }, } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2295,7 +2276,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } @@ -2312,17 +2293,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.KubeletConfig = &container.NodeKubeletConfig{} req.ForceSendFields = []string{"KubeletConfig"} } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2330,7 +2309,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } @@ -2346,17 +2325,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.LinuxNodeConfig = &container.LinuxNodeConfig{} req.ForceSendFields = []string{"LinuxNodeConfig"} } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2364,7 +2341,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } @@ -2381,17 +2358,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf Enabled: fastSocket["enabled"].(bool), } } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2399,7 +2374,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } diff --git a/google/services/container/resource_container_node_pool.go b/google/services/container/resource_container_node_pool.go index 6777ce7dddd..5b0b1fde213 100644 --- a/google/services/container/resource_container_node_pool.go +++ b/google/services/container/resource_container_node_pool.go @@ -565,11 +565,6 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e return err } - // Acquire read-lock on cluster. - clusterLockKey := nodePoolInfo.clusterLockKey() - transport_tpg.MutexStore.RLock(clusterLockKey) - defer transport_tpg.MutexStore.RUnlock(clusterLockKey) - // Acquire write-lock on nodepool. npLockKey := nodePoolInfo.nodePoolLockKey(nodePool.Name) transport_tpg.MutexStore.Lock(npLockKey) @@ -582,6 +577,9 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e timeout := d.Timeout(schema.TimeoutCreate) startTime := time.Now() + clusterLockKey := nodePoolInfo.clusterLockKey() + transport_tpg.MutexStore.RLock(clusterLockKey) + // we attempt to prefetch the node pool to make sure it doesn't exist before creation var id = fmt.Sprintf("projects/%s/locations/%s/clusters/%s/nodePools/%s", nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, nodePool.Name) name := getNodePoolName(id) @@ -596,11 +594,16 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e // refreshed on the next call to apply. d.SetId(id) } else if err == nil { + transport_tpg.MutexStore.RUnlock(clusterLockKey) return fmt.Errorf("resource - %s - already exists", id) } + transport_tpg.MutexStore.RUnlock(clusterLockKey) var operation *container.Operation err = retry.Retry(timeout, func() *retry.RetryError { + transport_tpg.MutexStore.Lock(clusterLockKey) + defer transport_tpg.MutexStore.Unlock(clusterLockKey) + clusterNodePoolsCreateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Create(nodePoolInfo.parent(), req) if config.UserProjectOverride { clusterNodePoolsCreateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) @@ -619,6 +622,8 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e } return nil }) + transport_tpg.MutexStore.RLock(clusterLockKey) + defer transport_tpg.MutexStore.RUnlock(clusterLockKey) if err != nil { return fmt.Errorf("error creating NodePool: %s", err) } @@ -793,10 +798,7 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e } } - // Acquire read-lock on cluster. clusterLockKey := nodePoolInfo.clusterLockKey() - transport_tpg.MutexStore.RLock(clusterLockKey) - defer transport_tpg.MutexStore.RUnlock(clusterLockKey) // Acquire write-lock on nodepool. npLockKey := nodePoolInfo.nodePoolLockKey(name) @@ -808,6 +810,8 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e var operation *container.Operation err = retry.Retry(timeout, func() *retry.RetryError { + transport_tpg.MutexStore.Lock(clusterLockKey) + defer transport_tpg.MutexStore.Unlock(clusterLockKey) clusterNodePoolsDeleteCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Delete(nodePoolInfo.fullyQualifiedName(name)) if config.UserProjectOverride { clusterNodePoolsDeleteCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) @@ -827,6 +831,8 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e return nil }) + transport_tpg.MutexStore.RLock(clusterLockKey) + defer transport_tpg.MutexStore.RUnlock(clusterLockKey) if err != nil { return fmt.Errorf("Error deleting NodePool: %s", err) @@ -1352,12 +1358,11 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node return err } - // Acquire read-lock on cluster. + // Cluster write-lock will be acquired when createOpF is called, and read-lock will be acquired when waitOpF is + // called. clusterLockKey := nodePoolInfo.clusterLockKey() - transport_tpg.MutexStore.RLock(clusterLockKey) - defer transport_tpg.MutexStore.RUnlock(clusterLockKey) - // Nodepool write-lock will be acquired when update function is called. + // Nodepool write-lock will be acquired when calling createOpF and waitOpF. npLockKey := nodePoolInfo.nodePoolLockKey(name) if d.HasChange(prefix + "autoscaling") { @@ -1385,24 +1390,22 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node Update: update, } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.Update(nodePoolInfo.parent(), req) if config.UserProjectOverride { clusterUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterUpdateCall.Do() - if err != nil { - return err - } + return clusterUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] Updated autoscaling in Node Pool %s", d.Id()) @@ -1417,24 +1420,21 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node req := &container.SetNodePoolSizeRequest{ NodeCount: newSize, } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsSetSizeCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.SetSize(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsSetSizeCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsSetSizeCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsSetSizeCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool size", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] GKE node pool %s size has been updated to %d", name, newSize) @@ -1452,24 +1452,21 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node Management: management, } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsSetManagementCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.SetManagement(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsSetManagementCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsSetManagementCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsSetManagementCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool management", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] Updated management in Node Pool %s", name) @@ -1480,23 +1477,20 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node NodePoolId: name, NodeVersion: d.Get(prefix + "version").(string), } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool version", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] Updated version in Node Pool %s", name) @@ -1506,22 +1500,19 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node req := &container.UpdateNodePoolRequest{ Locations: tpgresource.ConvertStringSet(d.Get(prefix + "node_locations").(*schema.Set)), } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool node locations", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] Updated node locations in Node Pool %s", name) @@ -1587,21 +1578,18 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node req := &container.UpdateNodePoolRequest{ UpgradeSettings: upgradeSettings, } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool upgrade settings", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] Updated upgrade settings in Node Pool %s", name) @@ -1613,18 +1601,15 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node NodePoolId: name, NodeNetworkConfig: expandNodeNetworkConfig(d.Get(prefix + "network_config")), } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -1632,7 +1617,7 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } @@ -1684,15 +1669,27 @@ func containerNodePoolAwaitRestingState(config *transport_tpg.Config, name, proj return state, err } -// Retries an operation while the canonical error code is FAILED_PRECONDTION -// or RESOURCE_EXHAUSTED which indicates there is an incompatible operation -// already running on the cluster or there are the number of allowed -// concurrent operations running on the cluster. These errors can be safely -// retried until the incompatible operation completes, and the newly -// requested operation can begin. -func retryWhileIncompatibleOperation(timeout time.Duration, lockKey string, f func() error) error { +// Retries an operation while the canonical error code is FAILED_PRECONDTION or RESOURCE_EXHAUSTED which indicates +// there is an incompatible operation already running on the cluster or there are the number of allowed concurrent +// operations running on the cluster. These errors can be safely retried until the incompatible operation completes, +// and the newly requested operation can begin. +// The npLockKey is held throughout createOpFunc and waitOpFunc. +// The clusterLockKey write-lock is held during createOpFunc to make operation creations sequential within the cluster, +// and clusterLockKey read-lock is held during waitOpFunc to allow concurrency on a cluster. +func retryWhileIncompatibleOperation(timeout time.Duration, npLockKey string, clusterLockKey string, createOpFunc func() (*container.Operation, error), waitOpFunc func(*container.Operation) error) error { + f := func() error { + transport_tpg.MutexStore.Lock(clusterLockKey) + op, err := createOpFunc() + transport_tpg.MutexStore.Unlock(clusterLockKey) + if err != nil { + return err + } + transport_tpg.MutexStore.RLock(clusterLockKey) + defer transport_tpg.MutexStore.RUnlock(clusterLockKey) + return waitOpFunc(op) + } return retry.Retry(timeout, func() *retry.RetryError { - if err := transport_tpg.LockedCall(lockKey, f); err != nil { + if err := transport_tpg.LockedCall(npLockKey, f); err != nil { if tpgresource.IsFailedPreconditionError(err) || tpgresource.IsQuotaError(err) { return retry.RetryableError(err) }