Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP for Feature Extract code #6958

Merged
merged 4 commits into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public enum ModeEnum {
public enum OutputType {
TSV,
ORC,
AVRO,
PARQUET
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package org.broadinstitute.hellbender.tools.variantdb.nextgen;

import htsjdk.variant.variantcontext.VariantContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.broadinstitute.barclay.argparser.Argument;
import org.broadinstitute.barclay.argparser.CommandLineProgramProperties;
import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions;
import org.broadinstitute.hellbender.cmdline.programgroups.ShortVariantDiscoveryProgramGroup;
import org.broadinstitute.hellbender.engine.FeatureContext;
import org.broadinstitute.hellbender.engine.ReadsContext;
import org.broadinstitute.hellbender.engine.ReferenceContext;
import org.broadinstitute.hellbender.engine.VariantWalker;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.tools.variantdb.*;
import org.broadinstitute.hellbender.tools.variantdb.IngestConstants;
import org.broadinstitute.hellbender.utils.tsv.SimpleXSVWriter;

import java.util.*;
import java.io.File;
import java.io.IOException;

/**
* Ingest variant walker
*/
@CommandLineProgramProperties(
summary = "Create files for ingesting VQSR Filtering data into BigQuery",
oneLineSummary = "Filtering Data Ingest tool for BQJG",
programGroup = ShortVariantDiscoveryProgramGroup.class,
omitFromCommandLine = true
)
public final class CreateFilteringFiles extends VariantWalker {
static final Logger logger = LogManager.getLogger(CreateVariantIngestFiles.class);

private SimpleXSVWriter writer;

private List<String> HEADER =
Arrays.asList("filter_set_name","mode","location","ref","alt","vqslod","culprit","training_label","yng");


@Argument(fullName = StandardArgumentDefinitions.OUTPUT_LONG_NAME,
shortName = StandardArgumentDefinitions.OUTPUT_SHORT_NAME,
doc = "Output file for filtering to be loaded into BigQuery",
optional = false)
private File output;

@Argument(
fullName = "filter-set-name",
doc = "Name to use in output files as the filter set name",
optional = false)
private String filterSetName;

@Argument(
fullName = "ref-version",
doc = "Remove this option!!!! only for ease of testing. Valid options are 37 or 38",
optional = true)
private String refVersion = "37";

@Argument(
fullName = "mode",
doc = "SNP or INDEL",
optional = false)
private String mode;

@Override
public boolean requiresIntervals() {
return false;
}

@Override
public void onTraversalStart() {
try {
writer = new SimpleXSVWriter(output.toPath(), IngestConstants.SEPARATOR);
} catch (IOException ioe) {
throw new GATKException("Unable to initialize writer", ioe);
}
writer.setHeaderLine(HEADER);

// Set reference version -- TODO remove this in the future, also, can we get ref version from the header?
ChromosomeEnum.setRefVersion(refVersion);
}

@Override
public void apply(final VariantContext variant, final ReadsContext readsContext, final ReferenceContext referenceContext, final FeatureContext featureContext) {
Long location = SchemaUtils.encodeLocation(variant.getContig(), variant.getStart());
String ref = variant.getReference().getBaseString();

if (variant.getAlternateAlleles().size() > 1) {
throw new GATKException("Processing VCF with more than one alternate allele is disallowed: " + variant);
}
String alt = variant.getAlternateAllele(0).getBaseString();

String vqslod = variant.getAttributeAsString("VQSLOD","");
String culprit = variant.getAttributeAsString("culprit","");
String trainingLabel = variant.hasAttribute("POSITIVE_TRAIN_SITE")?"POSITIVE":(variant.hasAttribute("NEGATIVE_TRAIN_SITE")?"NEGATIVE":"");

// TODO: check with Laura -- should NEGATIVES also be NAYs?
String yng = variant.hasAttribute("POSITIVE_TRAIN_SITE")?"Y":"G";

List<String> row = Arrays.asList(
filterSetName,
mode,
location.toString(),
ref,
alt,
vqslod,
culprit,
trainingLabel,
yng
);

writer.getNewLineBuilder().setRow(row).write();

}

@Override
public void closeTool() {
if (writer != null) {
try {
writer.close();
} catch (final IOException e) {
throw new GATKException("Couldn't close writer", e);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,6 @@ public class ExtractCohort extends ExtractTool {
)
private String cohortTable = null;

@Argument(
fullName = "min-location",
doc = "When extracting data, only include locations >= this value",
optional = true
)
private Long minLocation = null;

@Argument(
fullName = "max-location",
doc = "When extracting data, only include locations <= this value",
optional = true
)
private Long maxLocation = null;

@Override
protected void onStartup() {
super.onStartup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,6 @@ public class ExtractFeatures extends ExtractTool {
private static final Logger logger = LogManager.getLogger(ExtractFeatures.class);
private ExtractFeaturesEngine engine;

// TODO change this to vet_table_prefix!
@Argument(
fullName = "vet-table",
doc = "Fully qualified name of the vet table",
optional = false
)
protected String fqVetTable = null;

// @Argument(
// fullName = "pet-table",
// doc = "Fully qualified name of the pet table where",
// optional = false
// )
// protected String fqPetTable = null;

@Argument(
fullName = "alt-allele-table",
doc = "Fully qualified name of the table where the alternate allele info is",
Expand All @@ -62,7 +47,7 @@ public class ExtractFeatures extends ExtractTool {

@Override
public boolean requiresIntervals() {
return true; // TODO -- do I need to check the boolean flag on this?
return false;
}

@Override
Expand All @@ -81,9 +66,9 @@ protected void onStartup() {
reference,
trainingSitesOnly,
fqAltAlleleTable,
fqVetTable,
sampleTableRef,
intervalArgumentCollection.getIntervals(getBestAvailableSequenceDictionary()),
minLocation,
maxLocation,
localSortMaxRecordsInRam,
printDebugInformation,
useBatchQueries,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,86 +1,42 @@
package org.broadinstitute.hellbender.tools.variantdb.nextgen;

import org.broadinstitute.hellbender.tools.variantdb.SchemaUtils;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import java.io.File;

import org.apache.commons.io.FileUtils;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.utils.bigquery.TableReference;
import org.broadinstitute.hellbender.utils.io.Resource;

public class ExtractFeaturesBQ {
private final static String FEATURE_EXTRACT_QUERY_RESOURCE =
"org/broadinstitute/hellbender/tools/variantdb/nextgen/feature_extract.sql";

private final static String VQSR_TRAINING_SITES_TABLE =
"broad-dsp-spec-ops.joint_genotyping_ref.vqsr_training_sites_*";

public static String getVQSRFeatureExtractQueryString(final TableReference vet, final TableReference altAllele, final TableReference sampleList,
final SimpleInterval interval, final boolean trainingSitesOnly) {
public static String getVQSRFeatureExtractQueryString(final TableReference altAllele, final TableReference sampleList,
final Long minLocation, final Long maxLocation, final boolean trainingSitesOnly) {

//TODO try to remove the dependency on the vet tables since there are multiple tables that contain the data we need
String trainingSitesStanza =
!trainingSitesOnly?"":
"AND location IN (SELECT location FROM `broad-dsp-spec-ops.joint_genotyping_ref.vqsr_training_sites_*` WHERE chrom='chr20')\n";
String query =
"WITH ref_ad_info AS (\n" +
"SELECT \n" +
" location,\n" +
" IFNULL(sum(cast(SPLIT(call_AD,\",\")[OFFSET(0)] as int64)),0) as ref_ad\n" +
"FROM `@vet` \n" +
"WHERE sample_id IN (SELECT sample_id FROM `@sample`)\n" +
"AND (location >= @start AND location <= @end) \n" +
trainingSitesStanza +
"AND (\n" +
" SELECT SUM(CAST(part AS int64)) FROM UNNEST(SPLIT(call_AD, ',')) part WITH OFFSET index WHERE index >= 1 ) \n" +
" > 1\n" +
"GROUP BY location),\n" +
"ref_sb_info AS (\n" +
"SELECT \n" +
" location,\n" +
" IFNULL(sum(cast(SPLIT(SPLIT(as_sb_table,\"|\")[OFFSET(0)],\",\")[OFFSET(0)] as int64)),0) as sb_ref_plus, \n" +
" IFNULL(sum(cast(SPLIT(SPLIT(as_sb_table,\"|\")[OFFSET(0)],\",\")[OFFSET(1)] as int64)),0) as sb_ref_minus \n" +
"FROM `@vet` \n" +
"WHERE sample_id IN (SELECT sample_id FROM `@sample`)\n" +
"AND (location >= @start AND location <= @end) \n" +
trainingSitesStanza +
"GROUP BY location)\n" +
"SELECT \n" +
" ai.location, \n" +
" ai.ref, \n" +
" ai.allele, \n" +
" RAW_QUAL,\n" +
" radi.ref_ad,\n" +
" AS_MQRankSum,\n" +
" AS_MQRankSum_ft,\n" +
" AS_ReadPosRankSum,\n" +
" AS_ReadPosRankSum_ft,\n" +
" RAW_MQ,\n" +
" RAW_AD,\n" +
" RAW_AD_GT_1,\n" +
" rsbi.SB_REF_PLUS,\n" +
" rsbi.SB_REF_MINUS,\n" +
" SB_ALT_PLUS, \n" +
" SB_ALT_MINUS\n" +
"FROM (\n" +
"SELECT aa.location, \n" +
" ref, \n" +
" allele, \n" +
" IFNULL(SUM(qual),0) as RAW_QUAL,\n" +
" `bqutil`.fn.median(ARRAY_AGG( raw_mqranksum_x_10 IGNORE NULLS)) / 10.0 as AS_MQRankSum,\n" +
" `broad-dsp-spec-ops`.joint_genotyping_ref.freq_table(ARRAY_AGG(raw_mqranksum_x_10 IGNORE NULLS)) AS_MQRankSum_ft,\n" +
" `bqutil`.fn.median(ARRAY_AGG(raw_readposranksum_x_10 IGNORE NULLS)) / 10.0 as AS_ReadPosRankSum,\n" +
" `broad-dsp-spec-ops`.joint_genotyping_ref.freq_table(ARRAY_AGG(raw_readposranksum_x_10 IGNORE NULLS)) as AS_ReadPosRankSum_ft,\n" +
" IFNULL(SUM(RAW_MQ),0) as RAW_MQ,\n" +
" IFNULL(SUM(AD),0) as RAW_AD, \n" +
" IFNULL(SUM(CASE WHEN AD > 1 THEN AD ELSE 0 END),0) as RAW_AD_GT_1, # to match GATK implementation\n" +
" IFNULL(SUM(SB_ALT_PLUS),0) as SB_ALT_PLUS, \n" +
" IFNULL(SUM(SB_ALT_MINUS),0) as SB_ALT_MINUS\n" +
"FROM `@altAllele` as aa\n" +
"WHERE (location >= @start AND location <= @end) \n" +
trainingSitesStanza +
"AND sample_id in (SELECT sample_id from `@sample` )\n" +
"AND allele != '*'\n" +
"GROUP BY 1,2,3\n" +
") ai\n" +
"LEFT JOIN ref_ad_info radi ON (ai.location = radi.location)\n" +
"LEFT JOIN ref_sb_info rsbi ON (ai.location = rsbi.location)\n";
!trainingSitesOnly?"":
"AND location IN (SELECT location FROM `" + VQSR_TRAINING_SITES_TABLE + "`)\n";

String locationStanza =
((minLocation != null)?"AND location >= " + minLocation + " \n":"") +
((maxLocation != null)?"AND location < " + maxLocation + " \n":"");

return query
.replaceAll("@vet", vet.getFQTableName())
try {
File file = Resource.getResourceContentsAsFile(FEATURE_EXTRACT_QUERY_RESOURCE);
String query = FileUtils.readFileToString(file, "UTF-8");

return query
.replaceAll("@locationStanza", locationStanza)
.replaceAll("@trainingSitesStanza", trainingSitesStanza)
.replaceAll("@sample", sampleList.getFQTableName())
.replaceAll("@start", String.format("%d", SchemaUtils.encodeLocation(interval.getContig(), interval.getStart())))
.replaceAll( "@end", String.format("%d",SchemaUtils.encodeLocation(interval.getContig(),interval.getEnd())))
.replaceAll( "@altAllele", altAllele.getFQTableName());
}}
.replaceAll("@altAllele", altAllele.getFQTableName());

} catch (Exception ioe) {
throw new GATKException("Unable to read query file from resources", ioe);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ public class ExtractFeaturesEngine {
private final String projectID;

private final TableReference altAlleleTable;
private final TableReference vetTable;
private final TableReference sampleListTable;
private final ProgressMeter progressMeter;
private List<SimpleInterval> userIntervals;
private final Long minLocation;
private final Long maxLocation;
private final boolean useBatchQueries;

// /** Set of sample names seen in the variant data from BigQuery. */
Expand All @@ -63,9 +63,9 @@ public ExtractFeaturesEngine(final String projectID,
final ReferenceDataSource refSource,
final boolean trainingSitesOnly,
final String fqAltAlleleTable,
final String fqVetTable,
final TableReference sampleListTable,
final List<SimpleInterval> userIntervals,
final Long minLocation,
final Long maxLocation,
final int localSortMaxRecordsInRam,
final boolean printDebugInformation,
final boolean useBatchQueries,
Expand All @@ -77,27 +77,25 @@ public ExtractFeaturesEngine(final String projectID,
this.refSource = refSource;
this.trainingSitesOnly = trainingSitesOnly;
this.altAlleleTable = new TableReference(fqAltAlleleTable, SchemaUtils.ALT_ALLELE_FIELDS);
this.vetTable = new TableReference(fqVetTable, SchemaUtils.VET_FIELDS);
this.sampleListTable = sampleListTable;
this.printDebugInformation = printDebugInformation;
this.useBatchQueries = useBatchQueries;
this.progressMeter = progressMeter;
this.userIntervals = userIntervals;
this.minLocation = minLocation;
this.maxLocation = maxLocation;

this.variantContextMerger = new ReferenceConfidenceVariantContextMerger(annotationEngine, vcfHeader);

}
public void traverse() {
userIntervals.forEach(interval -> {
final String featureQueryString = ExtractFeaturesBQ.getVQSRFeatureExtractQueryString(vetTable, altAlleleTable, sampleListTable, interval, trainingSitesOnly);
logger.info(featureQueryString);
final StorageAPIAvroReader storageAPIAvroReader = BigQueryUtils.executeQueryWithStorageAPI(featureQueryString, SchemaUtils.FEATURE_EXTRACT_FIELDS, projectID, useBatchQueries);
final String featureQueryString = ExtractFeaturesBQ.getVQSRFeatureExtractQueryString(altAlleleTable, sampleListTable, minLocation, maxLocation, trainingSitesOnly);
logger.info(featureQueryString);
final StorageAPIAvroReader storageAPIAvroReader = BigQueryUtils.executeQueryWithStorageAPI(featureQueryString, SchemaUtils.FEATURE_EXTRACT_FIELDS, projectID, useBatchQueries);

createVQSRInputFromTableResult(storageAPIAvroReader, interval.getContig());
});
createVQSRInputFromTableResult(storageAPIAvroReader);
}

private void createVQSRInputFromTableResult(final GATKAvroReader avroReader, final String contig) {
private void createVQSRInputFromTableResult(final GATKAvroReader avroReader) {
final org.apache.avro.Schema schema = avroReader.getSchema();
final Set<String> columnNames = new HashSet<>();
if ( schema.getField(LOCATION_FIELD_NAME) == null ) {
Expand All @@ -115,13 +113,14 @@ private void createVQSRInputFromTableResult(final GATKAvroReader avroReader, fin
}
sortingCollection.printTempFileStats();
for ( final GenericRecord row : sortingCollection ) {
processVQSRRecordForPosition(row, contig);
processVQSRRecordForPosition(row);
}
}

private void processVQSRRecordForPosition(GenericRecord rec, String contig) {
private void processVQSRRecordForPosition(GenericRecord rec) {
final long location = Long.parseLong(rec.get(LOCATION_FIELD_NAME).toString());

String contig = SchemaUtils.decodeContig(location);
int position = SchemaUtils.decodePosition(location);
// I don't understand why the other modes iterate through a list of all columns, and then
// switch based on the column name rather than just getting the columns desired directly (like I'm going to do here).
Expand Down
Loading