diff --git a/scripts/variantstore/wdl/GvsExtractAvroFilesForHail.wdl b/scripts/variantstore/wdl/GvsExtractAvroFilesForHail.wdl index 16afeb1ea5d..29a4c11e63e 100644 --- a/scripts/variantstore/wdl/GvsExtractAvroFilesForHail.wdl +++ b/scripts/variantstore/wdl/GvsExtractAvroFilesForHail.wdl @@ -9,6 +9,7 @@ workflow GvsExtractAvroFilesForHail { String project_id String dataset_name String filter_set_name + String call_set_identifier Int scatter_width = 10 } @@ -28,7 +29,8 @@ workflow GvsExtractAvroFilesForHail { project_id = project_id, dataset_name = dataset_name, filter_set_name = filter_set_name, - avro_sibling = OutputPath.out + avro_sibling = OutputPath.out, + call_set_identifier = call_set_identifier } call Utils.CountSuperpartitions { @@ -43,6 +45,7 @@ workflow GvsExtractAvroFilesForHail { project_id = project_id, dataset_name = dataset_name, filter_set_name = filter_set_name, + call_set_identifier = call_set_identifier, avro_sibling = OutputPath.out, num_superpartitions = CountSuperpartitions.num_superpartitions, shard_index = i, @@ -99,18 +102,18 @@ task ExtractFromNonSuperpartitionedTables { String dataset_name String filter_set_name String avro_sibling + String call_set_identifier } parameter_meta { avro_sibling: "Cloud path to a file that will be the sibling to the 'avro' 'directory' under which output Avro files will be written." } command <<< set -o errexit -o nounset -o xtrace -o pipefail - echo "project_id = ~{project_id}" > ~/.bigqueryrc avro_prefix="$(dirname ~{avro_sibling})/avro" echo $avro_prefix > "avro_prefix.out" - bq query --nouse_legacy_sql --project_id=~{project_id} " + python3 /app/run_avro_query.py --sql " EXPORT DATA OPTIONS( uri='${avro_prefix}/sample_mapping/sample_mapping_*.avro', format='AVRO', compression='SNAPPY') AS SELECT sample_id, sample_name, '40', @@ -119,33 +122,33 @@ task ExtractFromNonSuperpartitionedTables { WHERE withdrawn IS NULL AND is_control = false ORDER BY sample_id - " + " --call_set_identifier ~{call_set_identifier} --dataset_name ~{dataset_name} --table_name sample_info --project_id ~{project_id} - bq query --nouse_legacy_sql --project_id=~{project_id} " + python3 /app/run_avro_query.py --sql " EXPORT DATA OPTIONS( uri='${avro_prefix}/vqsr_filtering_data/vqsr_filtering_data_*.avro', format='AVRO', compression='SNAPPY') AS SELECT location, type as model, ref, alt, vqslod, yng_status FROM \`~{project_id}.~{dataset_name}.filter_set_info\` WHERE filter_set_name = '~{filter_set_name}' ORDER BY location - " + " --call_set_identifier ~{call_set_identifier} --dataset_name ~{dataset_name} --table_name filter_set_info --project_id ~{project_id} - bq query --nouse_legacy_sql --project_id=~{project_id} " + python3 /app/run_avro_query.py --sql " EXPORT DATA OPTIONS( uri='${avro_prefix}/site_filtering_data/site_filtering_data_*.avro', format='AVRO', compression='SNAPPY') AS SELECT location, filters FROM \`~{project_id}.~{dataset_name}.filter_set_sites\` WHERE filter_set_name = '~{filter_set_name}' ORDER BY location - " + " --call_set_identifier ~{call_set_identifier} --dataset_name ~{dataset_name} --table_name filter_set_sites --project_id ~{project_id} - bq query --nouse_legacy_sql --project_id=~{project_id} " + python3 /app/run_avro_query.py --sql " EXPORT DATA OPTIONS( uri='${avro_prefix}/vqsr_tranche_data/vqsr_tranche_data_*.avro', format='AVRO', compression='SNAPPY') AS SELECT model, truth_sensitivity, min_vqslod, filter_name FROM \`~{project_id}.~{dataset_name}.filter_set_tranches\` WHERE filter_set_name = '~{filter_set_name}' - " + " --call_set_identifier ~{call_set_identifier} --dataset_name ~{dataset_name} --table_name filter_set_tranches --project_id ~{project_id} >>> output { @@ -154,7 +157,7 @@ task ExtractFromNonSuperpartitionedTables { } runtime { - docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:409.0.0-alpine" + docker: "us.gcr.io/broad-dsde-methods/variantstore:2023-03-07-alpine-v2" disks: "local-disk 500 HDD" } } @@ -171,6 +174,7 @@ task ExtractFromSuperpartitionedTables { String dataset_name String filter_set_name String avro_sibling + String call_set_identifier Int num_superpartitions Int shard_index Int num_shards @@ -184,8 +188,6 @@ task ExtractFromSuperpartitionedTables { command <<< set -o errexit -o nounset -o xtrace -o pipefail - echo "project_id = ~{project_id}" > ~/.bigqueryrc - avro_prefix="$(dirname ~{avro_sibling})/avro" for superpartition in $(seq ~{shard_index + 1} ~{num_shards} ~{num_superpartitions}) @@ -194,7 +196,7 @@ task ExtractFromSuperpartitionedTables { # These bq exports error out if there are any objects at the sibling level to where output files would be written # so an extra layer of `vet_${str_table_index}` is inserted here. - bq query --nouse_legacy_sql --project_id=~{project_id} " + python3 /app/run_avro_query.py --sql " EXPORT DATA OPTIONS( uri='${avro_prefix}/vets/vet_${str_table_index}/vet_${str_table_index}_*.avro', format='AVRO', compression='SNAPPY') AS SELECT location, v.sample_id, ref, REPLACE(alt,',','') alt, call_GT as GT, call_AD as AD, call_GQ as GQ, cast(SPLIT(call_pl,',')[OFFSET(0)] as int64) as RGQ @@ -203,9 +205,9 @@ task ExtractFromSuperpartitionedTables { WHERE withdrawn IS NULL AND is_control = false ORDER BY location - " + " --call_set_identifier ~{call_set_identifier} --dataset_name ~{dataset_name} --table_name vet_${str_table_index} --project_id ~{project_id} - bq query --nouse_legacy_sql --project_id=~{project_id} " + python3 /app/run_avro_query.py --sql " EXPORT DATA OPTIONS( uri='${avro_prefix}/refs/ref_ranges_${str_table_index}/ref_ranges_${str_table_index}_*.avro', format='AVRO', compression='SNAPPY') AS SELECT location, r.sample_id, length, state @@ -214,7 +216,7 @@ task ExtractFromSuperpartitionedTables { WHERE withdrawn IS NULL AND is_control = false ORDER BY location - " + " --call_set_identifier ~{call_set_identifier} --dataset_name ~{dataset_name} --table_name ref_ranges_${str_table_index} --project_id ~{project_id} done >>> @@ -223,7 +225,7 @@ task ExtractFromSuperpartitionedTables { } runtime { - docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:409.0.0-alpine" + docker: "us.gcr.io/broad-dsde-methods/variantstore:2023-03-07-alpine-v2" disks: "local-disk 500 HDD" } } @@ -291,7 +293,7 @@ task GenerateHailScripts { File hail_create_vat_inputs_script = 'hail_create_vat_inputs.py' } runtime { - docker: "us.gcr.io/broad-dsde-methods/variantstore:2023-01-23-alpine" + docker: "us.gcr.io/broad-dsde-methods/variantstore:2023-03-07-alpine-v2" disks: "local-disk 500 HDD" } } diff --git a/scripts/variantstore/wdl/GvsQuickstartHailIntegration.wdl b/scripts/variantstore/wdl/GvsQuickstartHailIntegration.wdl index 71e1d4a658f..4bc06f39c20 100644 --- a/scripts/variantstore/wdl/GvsQuickstartHailIntegration.wdl +++ b/scripts/variantstore/wdl/GvsQuickstartHailIntegration.wdl @@ -27,6 +27,7 @@ workflow GvsQuickstartHailIntegration { dataset_name = GvsQuickstartVcfIntegration.dataset_name, filter_set_name = GvsQuickstartVcfIntegration.filter_set_name, scatter_width = 10, + call_set_identifier = branch_name } call CreateAndTieOutVds { diff --git a/scripts/variantstore/wdl/extract/run_avro_query.py b/scripts/variantstore/wdl/extract/run_avro_query.py new file mode 100644 index 00000000000..be680a3e1df --- /dev/null +++ b/scripts/variantstore/wdl/extract/run_avro_query.py @@ -0,0 +1,36 @@ +import argparse + +from google.cloud import bigquery +from google.cloud.bigquery.job import QueryJobConfig +import utils + + +def run_avro_query(call_set_identifier, dataset_name, table_name, project_id, sql): + # add labels for DSP Cloud Cost Control Labeling and Reporting + query_labels_map = {'service': 'gvs', 'team': 'variants', 'managedby': 'gvs_extract_avro_files_for_hail'} + + default_config = QueryJobConfig(labels=query_labels_map, priority="INTERACTIVE", use_query_cache=True) + client = bigquery.Client(project=project_id, + default_query_job_config=default_config) + query_return = utils.execute_with_retry(client, table_name, sql) + utils.write_job_stats([{'job': query_return['job'], 'label': query_return['label']}], client, + f"{project_id}.{dataset_name}", call_set_identifier, 'GvsExtractAvroFilesForHail', + 'GenerateAvroFiles', table_name, True) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(allow_abbrev=False, description='Extract avro files from GVS BigQuery dataset') + parser.add_argument('--call_set_identifier', type=str, + help='callset identifier used to track costs in cost_observability table', default='false') + parser.add_argument('--dataset_name',type=str, help='BigQuery dataset name', required=True) + parser.add_argument('--table_name',type=str, help='BigQuery table name', required=True) + parser.add_argument('--project_id', type=str, help='Google project for the GVS dataset', required=True) + parser.add_argument('--sql', type=str, help='SQL to run to extract Avro data', required=True) + + args = parser.parse_args() + + run_avro_query(args.call_set_identifier, + args.dataset_name, + args.table_name, + args.project_id, + args.sql)