diff --git a/dataproc/create_cluster_and_submit_job.py b/dataproc/create_cluster_and_submit_job.py index 0cf58ad74c53..377baa6452c0 100644 --- a/dataproc/create_cluster_and_submit_job.py +++ b/dataproc/create_cluster_and_submit_job.py @@ -119,6 +119,35 @@ def get_cluster_id_by_name(cluster_list, cluster_name): cluster = [c for c in cluster_list if c['clusterName'] == cluster_name][0] return cluster['clusterUuid'], cluster['config']['configBucket'] +# [START update_cluster] +def update_cluster(dataproc, project, cluster_name, zone, num_workers): + """Updates the specified cluster to the given number of workers. + Use 'config.worker_config.num_instances' and 'workerConfig' for workers. + Use 'config.secondary_worker_config.num_instances' and + 'secondaryWorkerConfig' for preemptible workers.""" + print('Updating cluster to %d workers.' % num_workers) + zone_uri = \ + 'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format( + project, zone) + cluster_data = { + 'projectId': project, + 'clusterName': cluster_name, + 'config': { + 'gceClusterConfig': { + 'zoneUri': zone_uri + }, + 'workerConfig': { + 'numInstances': num_workers + } + } + } + result = dataproc.projects().regions().clusters().patch( + projectId = project, + region = REGION, + clusterName = cluster_name, + updateMask = 'config.worker_config.num_instances', + body = cluster_data).execute() +# [END update_cluster] # [START submit_pyspark_job] def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):