Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WDLize GvsPrepareCallset (briefly known as CreateCohortTable) #7200

Merged
merged 30 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a0e4874
first pass at CreateCohortTable.wdl, rename python script
mmorgantaylor Apr 12, 2021
e3ad814
update wdl defaults, add this branch to dockstore
mmorgantaylor Apr 12, 2021
bd3469e
add inputs file
mmorgantaylor Apr 12, 2021
e01c16f
fix indentation on dockstore y(f)ml
mmorgantaylor Apr 12, 2021
be53b43
pull script from github branch directly
mmorgantaylor Apr 13, 2021
a5dd483
change string reference character
mmorgantaylor Apr 13, 2021
8293d0f
revert previous; add missing input
mmorgantaylor Apr 13, 2021
9c6d19a
fix attempt for defaults - use new variable names
mmorgantaylor Apr 13, 2021
5fffcab
try alternate string reference character again ~
mmorgantaylor Apr 13, 2021
2dfefff
add bigquery installation
mmorgantaylor Apr 13, 2021
3d02841
use custom docker, add docker build script
mmorgantaylor Apr 13, 2021
4ba7b65
make build_docker script easier to use
mmorgantaylor Apr 13, 2021
9c0a154
add entrypoint to dockerfile, run script in app dir
mmorgantaylor Apr 13, 2021
11d5c79
try something else
mmorgantaylor Apr 13, 2021
433fdfa
remove ls from wdl
mmorgantaylor Apr 13, 2021
6d7e113
add SA key file as input, use google base image for gcloud auth support
mmorgantaylor Apr 13, 2021
a705592
cleanup, fix docker input
mmorgantaylor Apr 13, 2021
bb916f3
functional SA authentication for CreateCohortTable
mmorgantaylor Apr 13, 2021
f2122e0
don't copy SA file, localization happens anyway
mmorgantaylor Apr 14, 2021
b475a9d
define defaults better
mmorgantaylor Apr 14, 2021
0d4bb2f
finish defaults for wdl inputs
mmorgantaylor Apr 14, 2021
049909f
use default_dataset, further clean up unneeded inputs
mmorgantaylor Apr 14, 2021
0ecb319
update example inputs json
mmorgantaylor Apr 14, 2021
018fae3
update inputs with new docker image
mmorgantaylor Apr 14, 2021
a21626c
rename create_cohort_data_table.py
mmorgantaylor Apr 14, 2021
438b89e
remove redundant google sdk installation from Dockerfile
mmorgantaylor Apr 14, 2021
9ffe9ae
update shell script with new python script name
mmorgantaylor Apr 14, 2021
2024451
update to support latest docker tag; remove branch from dockstore yml
mmorgantaylor Apr 14, 2021
8ea51c8
refactor duplicated config setup
mmorgantaylor Apr 14, 2021
2c5d25e
rename wdl GvsPrepareCallset
mmorgantaylor Apr 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions scripts/variantstore/wdl/CreateCohortTable.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ task CreateCohortTableTask {
set -e

if [ ~{has_service_account_file} = 'true' ]; then
gcloud auth activate-service-account --key-file='~{service_account_json}'
SA_FILENAME="sa_key.json"
gsutil cp "~{service_account_json}" $SA_FILENAME
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is declared as aFile above, won't it already be copied down to the VM now and the path be a local path?

SA_ARGS="--sa_key_path ${SA_FILENAME}"
else
SA_ARGS=""
fi

python3 /app/create_cohort_data_table.py \
Expand All @@ -68,7 +72,8 @@ task CreateCohortTableTask {
--destination_table ~{destination_cohort_table_name_final} \
--fq_cohort_sample_names ~{fq_cohort_sample_table_final} \
--query_project ~{query_project_final} \
--fq_sample_mapping_table ~{fq_sample_mapping_table_final}
--fq_sample_mapping_table ~{fq_sample_mapping_table_final} \
$SA_ARGS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

>>>

runtime {
Expand Down
47 changes: 31 additions & 16 deletions scripts/variantstore/wdl/extract/create_cohort_data_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from google.cloud import bigquery
from google.cloud.bigquery.job import QueryJobConfig
from google.oauth2 import service_account


import argparse

Expand Down Expand Up @@ -56,7 +58,7 @@ def execute_with_retry(label, sql):
while len(retry_delay) > 0:
try:
query = client.query(sql)

print(f"STARTING - {label}")
JOB_IDS.add((label, query.job_id))
results = query.result()
Expand Down Expand Up @@ -86,8 +88,8 @@ def split_lists(samples, n):

def get_all_samples(fq_cohort_sample_names, fq_sample_mapping_table):
sql = f"select m.sample_id from `{fq_cohort_sample_names}` c JOIN `{fq_sample_mapping_table}` m ON (m.sample_name = c.sample_name)"
results = execute_with_retry("read cohort table", sql)

results = execute_with_retry("read cohort table", sql)
cohort = [row.sample_id for row in list(results)]
cohort.sort()
return cohort
Expand All @@ -104,7 +106,7 @@ def get_subselect(fq_vet_table, samples, id):
sample_stanza = ','.join([str(s) for s in samples])
sql = f" q_{id} AS (SELECT location, sample_id, ref, alt, call_GT, call_GQ, call_pl, QUALapprox, AS_QUALapprox from `{fq_vet_table}` WHERE sample_id IN ({sample_stanza})), "
return sql

subs = {}
for i in range(1, PET_VET_TABLE_COUNT+1):
partition_samples = get_samples_for_partition(cohort, i)
Expand All @@ -124,17 +126,17 @@ def get_subselect(fq_vet_table, samples, id):
"q_all AS (" + (" union all ".join([ f"(SELECT * FROM q_{id})" for id in subs.keys()])) + ")\n" + \
f" (SELECT * FROM q_all)"

print(sql)
print(f"VET Query is {utf8len(sql)/(1024*1024)} MB in length")
results = execute_with_retry("insert vet new table", sql)
print(sql)
print(f"VET Query is {utf8len(sql)/(1024*1024)} MB in length")
results = execute_with_retry("insert vet new table", sql)
return results



def create_position_table(fq_temp_table_dataset, min_variant_samples):
dest = f"{fq_temp_table_dataset}.{VET_DISTINCT_POS_TABLE}"
# only create this clause if min_variant_samples > 0, becuase if

# only create this clause if min_variant_samples > 0, becuase if
# it is == 0 then we don't need to touch the sample_id column (which doubles the cost of this query)
min_sample_clause = ""
if min_variant_samples > 0:
Expand Down Expand Up @@ -179,9 +181,9 @@ def get_pet_subselect(fq_pet_table, samples, id):
"q_all AS (" + (" union all ".join([ f"(SELECT * FROM q_{id})" for id in subs.keys()])) + ")\n" + \
f" (SELECT * FROM q_all)"

#print(sql)
#print(sql)
print(f"PET Query is {utf8len(sql)/(1024*1024)} MB in length")
results = execute_with_retry("insert pet new table", sql)
results = execute_with_retry("insert pet new table", sql)
return results


Expand Down Expand Up @@ -216,7 +218,7 @@ def populate_final_extract_table(fq_temp_table_dataset, fq_destination_dataset,
cohort_extract_final_query_job = client.query(sql)

cohort_extract_final_query_job.result()
JOB_IDS.add((f"insert final cohort table {dest}", cohort_extract_final_query_job.job_id))
JOB_IDS.add((f"insert final cohort table {dest}", cohort_extract_final_query_job.job_id))
return

def do_extract(fq_pet_vet_dataset,
Expand All @@ -227,11 +229,22 @@ def do_extract(fq_pet_vet_dataset,
fq_destination_dataset,
destination_table,
min_variant_samples,
fq_sample_mapping_table
fq_sample_mapping_table,
sa_key_path
):
try:
global client
client = bigquery.Client(project=query_project,

if sa_key_path:
credentials = service_account.Credentials.from_service_account_file(
sa_key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(credentials=credentials,
project=query_project,
default_query_job_config=QueryJobConfig(labels={ "id" : f"test_cohort_export_{output_table_prefix}"}, priority="INTERACTIVE", use_query_cache=False ))
else:
client = bigquery.Client(project=query_project,
default_query_job_config=QueryJobConfig(labels={ "id" : f"test_cohort_export_{output_table_prefix}"}, priority="INTERACTIVE", use_query_cache=False ))

## TODO -- provide a cmdline arg to override this (so we can simulat smaller datasets)
Expand All @@ -254,7 +267,7 @@ def do_extract(fq_pet_vet_dataset,

if __name__ == '__main__':
parser = argparse.ArgumentParser(allow_abbrev=False, description='Extract a cohort from BigQuery Variant Store ')

parser.add_argument('--fq_petvet_dataset',type=str, help='project.dataset location of pet/vet data', required=True)
parser.add_argument('--fq_temp_table_dataset',type=str, help='project.dataset location where results should be stored', required=True)
parser.add_argument('--fq_destination_dataset',type=str, help='project.dataset location where results should be stored', required=True)
Expand All @@ -263,6 +276,7 @@ def do_extract(fq_pet_vet_dataset,
parser.add_argument('--query_project',type=str, help='Google project where query should be executed', required=True)
parser.add_argument('--min_variant_samples',type=int, help='Minimum variant samples at a site required to be emitted', required=False, default=0)
parser.add_argument('--fq_sample_mapping_table',type=str, help='Mapping table from sample_id to sample_name', required=True)
parser.add_argument('--sa_key_path',type=str, help='Path to json key file for SA', required=False)

parser.add_argument('--max_tables',type=int, help='Maximum number of PET/VET tables to consider', required=False, default=250)

Expand All @@ -278,4 +292,5 @@ def do_extract(fq_pet_vet_dataset,
args.fq_destination_dataset,
args.destination_table,
args.min_variant_samples,
args.fq_sample_mapping_table)
args.fq_sample_mapping_table,
args.sa_key_path)