Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track avro export costs [VS-769] #8236

Merged
merged 7 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions scripts/variantstore/wdl/GvsExtractAvroFilesForHail.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ workflow GvsExtractAvroFilesForHail {
String project_id
String dataset_name
String filter_set_name
String call_set_identifier
Int scatter_width = 10
}

Expand All @@ -28,7 +29,8 @@ workflow GvsExtractAvroFilesForHail {
project_id = project_id,
dataset_name = dataset_name,
filter_set_name = filter_set_name,
avro_sibling = OutputPath.out
avro_sibling = OutputPath.out,
call_set_identifier = call_set_identifier
}

call Utils.CountSuperpartitions {
Expand All @@ -43,6 +45,7 @@ workflow GvsExtractAvroFilesForHail {
project_id = project_id,
dataset_name = dataset_name,
filter_set_name = filter_set_name,
call_set_identifier = call_set_identifier,
avro_sibling = OutputPath.out,
num_superpartitions = CountSuperpartitions.num_superpartitions,
shard_index = i,
Expand Down Expand Up @@ -99,18 +102,18 @@ task ExtractFromNonSuperpartitionedTables {
String dataset_name
String filter_set_name
String avro_sibling
String call_set_identifier
}
parameter_meta {
avro_sibling: "Cloud path to a file that will be the sibling to the 'avro' 'directory' under which output Avro files will be written."
}
command <<<
set -o errexit -o nounset -o xtrace -o pipefail
echo "project_id = ~{project_id}" > ~/.bigqueryrc

avro_prefix="$(dirname ~{avro_sibling})/avro"
echo $avro_prefix > "avro_prefix.out"

bq query --nouse_legacy_sql --project_id=~{project_id} "
python3 /app/run_avro_query.py --sql "
EXPORT DATA OPTIONS(
uri='${avro_prefix}/sample_mapping/sample_mapping_*.avro', format='AVRO', compression='SNAPPY') AS
SELECT sample_id, sample_name, '40',
Expand All @@ -119,33 +122,33 @@ task ExtractFromNonSuperpartitionedTables {
WHERE withdrawn IS NULL AND
is_control = false
ORDER BY sample_id
"
" --call_set_identifier ~{call_set_identifier} --dataset_name ~{dataset_name} --table_name sample_info --project_id ~{project_id}

bq query --nouse_legacy_sql --project_id=~{project_id} "
python3 /app/run_avro_query.py --sql "
EXPORT DATA OPTIONS(
uri='${avro_prefix}/vqsr_filtering_data/vqsr_filtering_data_*.avro', format='AVRO', compression='SNAPPY') AS
SELECT location, type as model, ref, alt, vqslod, yng_status
FROM \`~{project_id}.~{dataset_name}.filter_set_info\`
WHERE filter_set_name = '~{filter_set_name}'
ORDER BY location
"
" --call_set_identifier ~{call_set_identifier} --dataset_name ~{dataset_name} --table_name filter_set_info --project_id ~{project_id}

bq query --nouse_legacy_sql --project_id=~{project_id} "
python3 /app/run_avro_query.py --sql "
EXPORT DATA OPTIONS(
uri='${avro_prefix}/site_filtering_data/site_filtering_data_*.avro', format='AVRO', compression='SNAPPY') AS
SELECT location, filters
FROM \`~{project_id}.~{dataset_name}.filter_set_sites\`
WHERE filter_set_name = '~{filter_set_name}'
ORDER BY location
"
" --call_set_identifier ~{call_set_identifier} --dataset_name ~{dataset_name} --table_name filter_set_sites --project_id ~{project_id}

bq query --nouse_legacy_sql --project_id=~{project_id} "
python3 /app/run_avro_query.py --sql "
EXPORT DATA OPTIONS(
uri='${avro_prefix}/vqsr_tranche_data/vqsr_tranche_data_*.avro', format='AVRO', compression='SNAPPY') AS
SELECT model, truth_sensitivity, min_vqslod, filter_name
FROM \`~{project_id}.~{dataset_name}.filter_set_tranches\`
WHERE filter_set_name = '~{filter_set_name}'
"
" --call_set_identifier ~{call_set_identifier} --dataset_name ~{dataset_name} --table_name filter_set_tranches --project_id ~{project_id}
>>>

output {
Expand All @@ -154,7 +157,7 @@ task ExtractFromNonSuperpartitionedTables {
}

runtime {
docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:409.0.0-alpine"
docker: "us.gcr.io/broad-dsde-methods/variantstore:2023-03-07-alpine-v2"
disks: "local-disk 500 HDD"
}
}
Expand All @@ -171,6 +174,7 @@ task ExtractFromSuperpartitionedTables {
String dataset_name
String filter_set_name
String avro_sibling
String call_set_identifier
Int num_superpartitions
Int shard_index
Int num_shards
Expand All @@ -184,8 +188,6 @@ task ExtractFromSuperpartitionedTables {

command <<<
set -o errexit -o nounset -o xtrace -o pipefail
echo "project_id = ~{project_id}" > ~/.bigqueryrc

avro_prefix="$(dirname ~{avro_sibling})/avro"

for superpartition in $(seq ~{shard_index + 1} ~{num_shards} ~{num_superpartitions})
Expand All @@ -194,7 +196,7 @@ task ExtractFromSuperpartitionedTables {

# These bq exports error out if there are any objects at the sibling level to where output files would be written
# so an extra layer of `vet_${str_table_index}` is inserted here.
bq query --nouse_legacy_sql --project_id=~{project_id} "
python3 /app/run_avro_query.py --sql "
EXPORT DATA OPTIONS(
uri='${avro_prefix}/vets/vet_${str_table_index}/vet_${str_table_index}_*.avro', format='AVRO', compression='SNAPPY') AS
SELECT location, v.sample_id, ref, REPLACE(alt,',<NON_REF>','') alt, call_GT as GT, call_AD as AD, call_GQ as GQ, cast(SPLIT(call_pl,',')[OFFSET(0)] as int64) as RGQ
Expand All @@ -203,9 +205,9 @@ task ExtractFromSuperpartitionedTables {
WHERE withdrawn IS NULL AND
is_control = false
ORDER BY location
"
" --call_set_identifier ~{call_set_identifier} --dataset_name ~{dataset_name} --table_name vet_${str_table_index} --project_id ~{project_id}

bq query --nouse_legacy_sql --project_id=~{project_id} "
python3 /app/run_avro_query.py --sql "
EXPORT DATA OPTIONS(
uri='${avro_prefix}/refs/ref_ranges_${str_table_index}/ref_ranges_${str_table_index}_*.avro', format='AVRO', compression='SNAPPY') AS
SELECT location, r.sample_id, length, state
Expand All @@ -214,7 +216,7 @@ task ExtractFromSuperpartitionedTables {
WHERE withdrawn IS NULL AND
is_control = false
ORDER BY location
"
" --call_set_identifier ~{call_set_identifier} --dataset_name ~{dataset_name} --table_name ref_ranges_${str_table_index} --project_id ~{project_id}
done
>>>

Expand All @@ -223,7 +225,7 @@ task ExtractFromSuperpartitionedTables {
}

runtime {
docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:409.0.0-alpine"
docker: "us.gcr.io/broad-dsde-methods/variantstore:2023-03-07-alpine-v2"
rsasch marked this conversation as resolved.
Show resolved Hide resolved
disks: "local-disk 500 HDD"
}
}
Expand Down Expand Up @@ -291,7 +293,7 @@ task GenerateHailScripts {
File hail_create_vat_inputs_script = 'hail_create_vat_inputs.py'
}
runtime {
docker: "us.gcr.io/broad-dsde-methods/variantstore:2023-01-23-alpine"
docker: "us.gcr.io/broad-dsde-methods/variantstore:2023-03-07-alpine-v2"
disks: "local-disk 500 HDD"
}
}
1 change: 1 addition & 0 deletions scripts/variantstore/wdl/GvsQuickstartHailIntegration.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ workflow GvsQuickstartHailIntegration {
dataset_name = GvsQuickstartVcfIntegration.dataset_name,
filter_set_name = GvsQuickstartVcfIntegration.filter_set_name,
scatter_width = 10,
call_set_identifier = branch_name
}

call CreateAndTieOutVds {
Expand Down
36 changes: 36 additions & 0 deletions scripts/variantstore/wdl/extract/run_avro_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import argparse

from google.cloud import bigquery
from google.cloud.bigquery.job import QueryJobConfig
import utils


def run_avro_query(call_set_identifier, dataset_name, table_name, project_id, sql):
# add labels for DSP Cloud Cost Control Labeling and Reporting
query_labels_map = {'service': 'gvs', 'team': 'variants', 'managedby': 'gvs_extract_avro_files_for_hail'}

default_config = QueryJobConfig(labels=query_labels_map, priority="INTERACTIVE", use_query_cache=True)
client = bigquery.Client(project=project_id,
default_query_job_config=default_config)
query_return = utils.execute_with_retry(client, table_name, sql)
utils.write_job_stats([{'job': query_return['job'], 'label': query_return['label']}], client,
f"{project_id}.{dataset_name}", call_set_identifier, 'GvsExtractAvroFilesForHail',
'GenerateAvroFiles', table_name, True)


if __name__ == '__main__':
parser = argparse.ArgumentParser(allow_abbrev=False, description='Extract avro files from GVS BigQuery dataset')
parser.add_argument('--call_set_identifier', type=str,
help='callset identifier used to track costs in cost_observability table', default='false')
parser.add_argument('--dataset_name',type=str, help='BigQuery dataset name', required=True)
parser.add_argument('--table_name',type=str, help='BigQuery table name', required=True)
parser.add_argument('--project_id', type=str, help='Google project for the GVS dataset', required=True)
parser.add_argument('--sql', type=str, help='SQL to run to extract Avro data', required=True)

args = parser.parse_args()

run_avro_query(args.call_set_identifier,
args.dataset_name,
args.table_name,
args.project_id,
args.sql)