Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create workflow to create and populate alt_allele table [VS-51] #7426

Merged
merged 24 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a81d5f1
messy first stab
rsasch Aug 9, 2021
4ea4904
Merge branch 'ah_var_store' into rsa_alt_allele_script
rsasch Aug 13, 2021
6a88c43
Merge branch 'ah_var_store' into rsa_alt_allele_script
rsasch Aug 13, 2021
0758d1e
Merge branch 'ah_var_store' into rsa_alt_allele_script
rsasch Aug 18, 2021
f562d35
added WDL to parallize loading alt_allele from vet tables
rsasch Aug 18, 2021
a59f5ec
fixed bugs, added new docker image
rsasch Aug 18, 2021
73811ff
add dockstore integration
rsasch Aug 18, 2021
96dc0dc
typo
rsasch Aug 18, 2021
5be2ee5
revenge of the typo
rsasch Aug 18, 2021
ce19869
fun with docker_build and scattering
rsasch Aug 18, 2021
bd42100
fun with docker_build and scattering
rsasch Aug 18, 2021
c75d64b
ugh running things in docker
rsasch Aug 18, 2021
b1e92f4
futzing
rsasch Aug 18, 2021
dd6e735
updated QUICKSTART readme and made input parameter names consistent
rsasch Aug 19, 2021
b50d525
fixed up Docker paths
rsasch Aug 19, 2021
1f33c5a
more futzing
rsasch Aug 19, 2021
58c9be6
Merge branch 'ah_var_store' into rsa_alt_allele_script
rsasch Aug 19, 2021
19848aa
Merge branch 'ah_var_store' into rsa_alt_allele_script
rsasch Aug 19, 2021
2ea1830
Merge branch 'ah_var_store' into rsa_alt_allele_script
rsasch Sep 1, 2021
92d4de8
Merge branch 'ah_var_store' into rsa_alt_allele_script
rsasch Sep 3, 2021
cc1a645
broke out create table into it's own task so scatter is only populating
rsasch Sep 3, 2021
402e735
made GetVetTableNames volatile
rsasch Sep 8, 2021
4af6b8c
fix create table query with schema
rsasch Sep 8, 2021
1d3e3ed
removed branch and added master to .dockstore.yml
rsasch Sep 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ workflows:
branches:
- master
- ah_var_store
- name: GvsCreateAltAllele
subclass: WDL
primaryDescriptorPath: /scripts/variantstore/wdl/GvsCreateAltAllele.wdl
filters:
branches:
- ah_var_store
- rsa_alt_allele_script
- name: GvsExtractCallset
subclass: WDL
primaryDescriptorPath: /scripts/variantstore/wdl/GvsExtractCallset.wdl
Expand Down
13 changes: 7 additions & 6 deletions scripts/variantstore/TERRA_QUICKSTART.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,16 @@ These are the required parameters which must be supplied to the workflow:
**NOTE**: if your workflow fails, you will need to manually remove a lockfile from the output directory. It is called LOCKFILE, and can be removed with `gsutil rm`

## 2. Create Alt Allele Table
**NOTE:** This is a bit of a kludge until we gain more confidence that the data loaded into the ALT_ALLELE table for feature training are optimal and we can automate this process
This step loads data into the ALT_ALLELE table from the `vet_*` tables.

You'll need to run this from the BigQuery Console for your dataset.
This is done by running the `GvsCreateAltAllele` workflow with the following parameters:

Load the SQL script you can find here in the [GATK GitHub Repository](https://github.com/broadinstitute/gatk/blob/ah_var_store/scripts/variantstore/bq/alt_allele_creation.example.sql)

There are three places where you need to replace the string `spec-ops-aou.gvs_tieout_acmg_v1` with your project and dataset name in the form `PROJECT.DATASET`
| Parameter | Description |
| ----------------- | ----------- |
| data_project | The name of the google project containing the dataset |
| default_dataset | The name of the dataset |

Execute the script, it should take 30-60 seconds to run resulting in the creation of the `ALT_ALLELE` table in your dataset
**Note:** This workflow does not use the Terra Entity model to run, so be sure to select `Run workflow with inputs defined by file paths`

rsasch marked this conversation as resolved.
Show resolved Hide resolved
## 3. Create Filter Set

Expand Down
121 changes: 121 additions & 0 deletions scripts/variantstore/wdl/GvsCreateAltAllele.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
version 1.0

workflow GvsCreateAltAllele {
input {
String dataset_project
String query_project_id = dataset_project
String default_dataset

String? service_account_json_path
Int? preemptible_tries
File? gatk_override
String? docker
}

String docker_final = select_first([docker, "us.gcr.io/broad-gatk/gatk:4.1.7.0"])

call GetVetTableNames {
input:
query_project_id = query_project_id,
dataset_project_id = dataset_project,
dataset_name = default_dataset,
service_account_json_path = service_account_json_path
}

scatter (idx in range(length(GetVetTableNames.vet_tables))) {
call PopulateAltAlleleTable {
input:
vet_table_name = GetVetTableNames.vet_tables[idx],
query_project_id = query_project_id,
dataset_project_id = dataset_project,
dataset_name = default_dataset,
service_account_json_path = service_account_json_path
}
}

output {
Array[String] vet_tables_loaded = PopulateAltAlleleTable.done
}
}

task GetVetTableNames {
input {
String query_project_id
String dataset_project_id
String dataset_name

String? service_account_json_path
}

String has_service_account_file = if (defined(service_account_json_path)) then 'true' else 'false'

command <<<
set -e

if [ ~{has_service_account_file} = 'true' ]; then
gsutil cp ~{service_account_json_path} local.service_account.json
gcloud auth activate-service-account --key-file=local.service_account.json
gcloud config set project ~{query_project_id}
fi

echo "project_id = ~{query_project_id}" > ~/.bigqueryrc
bq query --location=US --project_id=~{query_project_id} --format=csv --use_legacy_sql=false \
"SELECT table_name FROM ~{dataset_project_id}.~{dataset_name}.INFORMATION_SCHEMA.TABLES WHERE table_name LIKE 'vet_%' ORDER BY table_name" > vet_tables.csv

# remove the header row from the CSV file
sed -i 1d vet_tables.csv
>>>

output {
Array[String] vet_tables = read_lines("vet_tables.csv")
}

runtime {
docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:305.0.0"
memory: "3 GB"
disks: "local-disk 10 HDD"
preemptible: 3
cpu: 1
}
}

task PopulateAltAlleleTable {
input {
String vet_table_name
String query_project_id
String dataset_project_id
String dataset_name

String? service_account_json_path
}

String has_service_account_file = if (defined(service_account_json_path)) then 'true' else 'false'

command <<<
set -e

if [ ~{has_service_account_file} = 'true' ]; then
gsutil cp ~{service_account_json_path} local.service_account.json
gcloud auth activate-service-account --key-file=local.service_account.json
SERVICE_ACCOUNT_STANZA="--sa_key_path local.service_account.json "
fi

python3 /app/populate_alt_allele_table.py \
--query_project ~{query_project_id} \
--vet_table_name ~{vet_table_name} \
--fq_dataset ~{dataset_project_id}.~{dataset_name} \
$SERVICE_ACCOUNT_STANZA
>>>

output {
String done = "~{vet_table_name}"
}

runtime {
docker: "us.gcr.io/broad-dsde-methods/variantstore:ah_var_store_20210819"
memory: "3 GB"
disks: "local-disk 10 HDD"
preemptible: 3
rsasch marked this conversation as resolved.
Show resolved Hide resolved
cpu: 1
}
}
13 changes: 8 additions & 5 deletions scripts/variantstore/wdl/extract/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
FROM gcr.io/google.com/cloudsdktool/cloud-sdk:349.0.0

# Copy the application's requirements.txt and run pip to install
ADD requirements.txt /app/requirements.txt
COPY requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt
RUN apt-get update && apt-get -y upgrade && apt-get -y install bcftools

# Add the application source code.
ADD create_cohort_extract_data_table.py /app
ADD create_variant_annotation_table.py /app
ADD extract_subpop.py /app
# Copy the application source code.
COPY create_cohort_extract_data_table.py /app
COPY create_variant_annotation_table.py /app
COPY extract_subpop.py /app
COPY populate_alt_allele_table.py /app
COPY alt_allele_positions.sql /app
COPY alt_allele_temp_function.sql /app
rsasch marked this conversation as resolved.
Show resolved Hide resolved

WORKDIR /app
ENTRYPOINT ["/bin/bash"]
72 changes: 72 additions & 0 deletions scripts/variantstore/wdl/extract/alt_allele_positions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
select location, sample_id,
SPLIT(minimize(ref, SPLIT(alt,',')[OFFSET(0)]))[OFFSET(0)] as ref,
SPLIT(minimize(ref, SPLIT(alt,',')[OFFSET(0)]))[OFFSET(1)] as allele,
1 as allele_pos, call_GT, call_GQ,
as_raw_mq,
cast(SPLIT(as_raw_mq,'|')[OFFSET(1)] as int64) raw_mq,
as_raw_mqranksum,
SAFE_cast(SAFE_cast(SPLIT(as_raw_mqranksum,',')[SAFE_OFFSET(0)] as float64) * 10.0 as int64) as raw_mqranksum_x_10,
as_qualapprox,
qualapprox,
cast(SPLIT(as_qualapprox,'|')[OFFSET(0)] as int64) as qual,
as_raw_readposranksum,
SAFE_cast(SAFE_cast(SPLIT(as_raw_readposranksum,',')[SAFE_OFFSET(0)] as float64) * 10.0 as int64) as raw_readposranksum_x_10,
as_sb_table,
cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(0)],',')[OFFSET(0)] as int64) as sb_ref_plus,
cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(0)],',')[OFFSET(1)] as int64) as sb_ref_minus,
cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(1)],',')[OFFSET(0)] as int64) as sb_alt_plus,
cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(1)],',')[OFFSET(1)] as int64) as sb_alt_minus,
call_AD,
cast(SPLIT(call_AD,',')[OFFSET(0)] as int64) as ref_ad,
cast(SPLIT(call_AD,',')[OFFSET(1)] as int64) as ad
from position1

union all

select location, sample_id,
SPLIT(minimize(ref, SPLIT(alt,',')[OFFSET(0)]))[OFFSET(0)] as ref,
SPLIT(minimize(ref, SPLIT(alt,',')[OFFSET(0)]))[OFFSET(1)] as allele,
1 as allele_pos, call_GT, call_GQ,
as_raw_mq,
cast(SPLIT(as_raw_mq,'|')[OFFSET(1)] as int64) raw_mq,
as_raw_mqranksum,
SAFE_cast(SAFE_cast(SPLIT(as_raw_mqranksum,',')[SAFE_OFFSET(0)] as float64) * 10.0 as int64) as raw_mqranksum_x_10,
as_qualapprox,
qualapprox,
cast(SPLIT(as_qualapprox,'|')[OFFSET(0)] as int64) as qual,
as_raw_readposranksum,
SAFE_cast(SAFE_cast(SPLIT(as_raw_readposranksum,',')[SAFE_OFFSET(0)] as float64) * 10.0 as int64) as raw_readposranksum_x_10,
as_sb_table,
cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(0)],',')[OFFSET(0)] as int64) as sb_ref_plus,
cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(0)],',')[OFFSET(1)] as int64) as sb_ref_minus,
cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(1)],',')[OFFSET(0)] as int64) as sb_alt_plus,
cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(1)],',')[OFFSET(1)] as int64) as sb_alt_minus,
call_AD,
cast(SPLIT(call_AD,',')[OFFSET(0)] as int64) as ref_ad,
cast(SPLIT(call_AD,',')[OFFSET(1)] as int64) as ad
from position2

union all

select location, sample_id,
SPLIT(minimize(ref, SPLIT(alt,',')[OFFSET(1)]))[OFFSET(0)] as ref,
SPLIT(minimize(ref, SPLIT(alt,',')[OFFSET(1)]))[OFFSET(1)] as allele,
2 as allele_pos, call_GT, call_GQ,
as_raw_mq,
cast(SPLIT(as_raw_mq,'|')[OFFSET(2)] as int64) raw_mq,
as_raw_mqranksum,
SAFE_cast(SAFE_cast(SPLIT(as_raw_mqranksum,',')[SAFE_OFFSET(1)] as float64) * 10.0 as int64) as raw_mqranksum_x_10,
as_qualapprox,
qualapprox,
cast(SPLIT(as_qualapprox,'|')[OFFSET(1)] as int64) as qual,
as_raw_readposranksum,
SAFE_cast(SAFE_cast(SPLIT(as_raw_readposranksum,',')[SAFE_OFFSET(1)] as float64) * 10.0 as int64) as raw_readposranksum_x_10,
as_sb_table,
cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(0)],',')[OFFSET(0)] as int64) as sb_ref_plus,
cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(0)],',')[OFFSET(1)] as int64) as sb_ref_minus,
cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(2)],',')[OFFSET(0)] as int64) as sb_alt_plus,
cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(2)],',')[OFFSET(1)] as int64) as sb_alt_minus,
call_AD,
cast(SPLIT(call_AD,',')[OFFSET(0)] as int64) as ref_ad,
cast(SPLIT(call_AD,',')[OFFSET(2)] as int64) as ad
from position2;
14 changes: 14 additions & 0 deletions scripts/variantstore/wdl/extract/alt_allele_temp_function.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TEMPORARY FUNCTION minimize(ref STRING, allele STRING)
RETURNS STRING
LANGUAGE js AS """
let done = false
while (!done && ref.length !== 1) {
if (ref.slice(-1) === allele.slice(-1)) {
ref = ref.slice(0, -1)
allele = allele.slice(0,-1)
} else {
done = true
}
}
return ref+','+allele
""";
86 changes: 86 additions & 0 deletions scripts/variantstore/wdl/extract/populate_alt_allele_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import time
import os

from google.cloud import bigquery
from google.cloud.bigquery.job import QueryJobConfig
from google.oauth2 import service_account
from pathlib import Path

import argparse

client = None

def execute_with_retry(label, sql):
retry_delay = [30, 60, 90] # 3 retries with incremental backoff
start = time.time()
while len(retry_delay) > 0:
try:
query_label = label.replace(" ","-").strip().lower()

existing_labels = client._default_query_job_config.labels
job_labels = existing_labels
job_labels["gvs_query_name"] = query_label
job_config = bigquery.QueryJobConfig(labels=job_labels)
query = client.query(sql, job_config=job_config)

print(f"STARTING - {label}")
results = query.result()
print(f"COMPLETED ({time.time() - start} s, {3-len(retry_delay)} retries) - {label}")
return results
except Exception as err:
# if there are no retries left... raise
if (len(retry_delay) == 0):
raise err
else:
t = retry_delay.pop(0)
print(f"Error {err} running query {label}, sleeping for {t}")
time.sleep(t)

def populate_alt_allele_table(query_project, vet_table_name, fq_dataset, sa_key_path):
global client
default_config = QueryJobConfig(priority="INTERACTIVE", use_query_cache=True)

if sa_key_path:
credentials = service_account.Credentials.from_service_account_file(
sa_key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(credentials=credentials,
project=query_project,
default_query_job_config=default_config)
else:
client = bigquery.Client(project=query_project,
default_query_job_config=default_config)

os.chdir(os.path.dirname(__file__))
alt_allele_temp_function = Path('alt_allele_temp_function.sql').read_text()
alt_allele_positions = Path('alt_allele_positions.sql').read_text()
first = True if vet_table_name == "vet_001" else False
rsasch marked this conversation as resolved.
Show resolved Hide resolved

query_beginning = f"CREATE OR REPLACE TABLE {fq_dataset}.alt_allele PARTITION BY \
RANGE_BUCKET(location, GENERATE_ARRAY(0, 25000000000000, 1000000000000)) \
CLUSTER BY location, sample_id AS \n"
if not first:
query_beginning = f"INSERT INTO {fq_dataset}.alt_allele \n"
fq_vet_table = f"{fq_dataset}.{vet_table_name}"
query_with = f"""WITH
position1 as (select * from {fq_vet_table} WHERE call_GT IN ('0/1', '1/0', '1/1', '0|1', '1|0', '1|1', '0/2', '0|2','2/0', '2|0')),
position2 as (select * from {fq_vet_table} WHERE call_GT IN ('1/2', '1|2', '2/1', '2|1'))"""

sql = alt_allele_temp_function + query_beginning + query_with + alt_allele_positions
result = execute_with_retry(f"into alt allele from {vet_table_name}", sql)
return result

if __name__ == '__main__':
parser = argparse.ArgumentParser(allow_abbrev=False, description='Populate an alt_allele table for the BigQuery Variant Store')

parser.add_argument('--query_project',type=str, help='Google project where query should be executed', required=True)
parser.add_argument('--vet_table_name',type=str, help='vet table name to ingest', required=True)
parser.add_argument('--fq_dataset',type=str, help='project and dataset for data', required=True)
parser.add_argument('--sa_key_path',type=str, help='Path to json key file for SA', required=False)


# Execute the parse_args() method
args = parser.parse_args()

populate_alt_allele_table(args.query_project, args.vet_table_name, args.fq_dataset, args.sa_key_path)