From 0ab8543bd49db2a151a7b67f986d3d96911b5ab8 Mon Sep 17 00:00:00 2001 From: Michael L Heuer Date: Mon, 11 Sep 2017 20:26:23 -0500 Subject: [PATCH] Refactor cli commands into functions for use in cannoli-shell or notebooks. --- .../org/bdgenomics/cannoli/cli/Bcftools.scala | 124 ++++++++---- .../org/bdgenomics/cannoli/cli/Bedtools.scala | 120 ++++++++---- .../org/bdgenomics/cannoli/cli/Bowtie.scala | 125 ++++++++---- .../org/bdgenomics/cannoli/cli/Bowtie2.scala | 125 ++++++++---- .../org/bdgenomics/cannoli/cli/Bwa.scala | 180 +++++++++++------- .../bdgenomics/cannoli/cli/Freebayes.scala | 131 ++++++++----- .../bdgenomics/cannoli/cli/SampleReads.scala | 10 +- .../org/bdgenomics/cannoli/cli/Samtools.scala | 131 ++++++++----- .../org/bdgenomics/cannoli/cli/SnpEff.scala | 118 ++++++++---- .../scala/org/bdgenomics/cannoli/cli/Vt.scala | 126 ++++++++---- 10 files changed, 789 insertions(+), 401 deletions(-) diff --git a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bcftools.scala b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bcftools.scala index 7921c4b..7a6196f 100644 --- a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bcftools.scala +++ b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bcftools.scala @@ -19,7 +19,6 @@ package org.bdgenomics.cannoli.cli import htsjdk.samtools.ValidationStringency import org.apache.spark.SparkContext -import org.bdgenomics.adam.models.VariantContext import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs import org.bdgenomics.adam.rdd.variant.{ @@ -31,6 +30,81 @@ import org.bdgenomics.utils.cli._ import org.bdgenomics.utils.misc.Logging import org.kohsuke.args4j.{ Argument, Option => Args4jOption } +/** + * Bcftools function arguments. + */ +class BcftoolsFnArgs extends Args4jBase { + @Args4jOption(required = false, name = "-bcftools_path", usage = "Path to the BCFtools executable. Defaults to bcftools.") + var bcftoolsPath: String = "bcftools" + + @Args4jOption(required = true, name = "-bcftools_reference", usage = "Reference sequence for analysis. An index file (.fai) will be created if none exists.") + var referencePath: String = null + + @Args4jOption(required = false, name = "-docker_image", usage = "Docker image to use. Defaults to quay.io/biocontainers/bcftools:1.6--0.") + var dockerImage: String = "quay.io/biocontainers/bcftools:1.6--0" + + @Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch BCFtools. If false, uses the BCFtools executable path.") + var useDocker: Boolean = false +} + +/** + * Bcftools wrapper as a function VariantContextRDD → VariantContextRDD, + * for use in cannoli-shell or notebooks. + * + * @param args Bcftools function arguments. + * @param files Files to make locally available to the commands being run. + * @param environment A map containing environment variable/value pairs to set + * in the environment for the newly created process. + * @param sc Spark context. + */ +class BcftoolsFn( + val args: BcftoolsFnArgs, + val files: Seq[String], + val environment: Map[String, String], + val sc: SparkContext) extends Function1[VariantContextRDD, VariantContextRDD] with Logging { + + /** + * @param args Bcftools function arguments. + * @param sc Spark context. + */ + def this(args: BcftoolsFnArgs, sc: SparkContext) = this(args, Seq.empty, Map.empty, sc) + + /** + * @param args Bcftools function arguments. + * @param files Files to make locally available to the commands being run. + * @param sc Spark context. + */ + def this(args: BcftoolsFnArgs, files: Seq[String], sc: SparkContext) = this(args, files, Map.empty, sc) + + override def apply(variantContexts: VariantContextRDD): VariantContextRDD = { + + val bcftoolsCommand = if (args.useDocker) { + Seq("docker", + "run", + "--interactive", + "--rm", + args.dockerImage, + "bcftools", + "norm", + "--fasta-ref", + args.referencePath) + } else { + Seq(args.bcftoolsPath, + "norm", + "--fasta-ref", + args.referencePath) + } + + log.info("Piping {} to bcftools with command: {} files: {} environment: {}", + Array(variantContexts, bcftoolsCommand, files, environment)) + + implicit val tFormatter = VCFInFormatter + implicit val uFormatter = new VCFOutFormatter(sc.hadoopConfiguration) + + variantContexts.pipe(bcftoolsCommand, files, environment) + } +} + object Bcftools extends BDGCommandCompanion { val commandName = "bcftools" val commandDescription = "ADAM Pipe API wrapper for BCFtools norm." @@ -40,25 +114,16 @@ object Bcftools extends BDGCommandCompanion { } } -class BcftoolsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { +/** + * Bcftools command line arguments. + */ +class BcftoolsArgs extends BcftoolsFnArgs with ADAMSaveAnyArgs with ParquetArgs { @Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from, in VCF format.", index = 0) var inputPath: String = null @Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to, in VCF format.", index = 1) var outputPath: String = null - @Args4jOption(required = false, name = "-bcftools_path", usage = "Path to the BCFtools executable. Defaults to bcftools.") - var bcftoolsPath: String = "bcftools" - - @Args4jOption(required = true, name = "-bcftools_reference", usage = "Reference sequence for analysis. An index file (.fai) will be created if none exists.") - var referencePath: String = null - - @Args4jOption(required = false, name = "-docker_image", usage = "Docker image to use. Defaults to quay.io/biocontainers/bcftools:1.6--0.") - var dockerImage: String = "quay.io/biocontainers/bcftools:1.6--0" - - @Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch BCFtools. If false, uses the BCFtools executable path.") - var useDocker: Boolean = false - @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.") var asSingleFile: Boolean = false @@ -76,38 +141,15 @@ class BcftoolsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { } /** - * Bcftools. + * Bcftools command line wrapper. */ class Bcftools(protected val args: BcftoolsArgs) extends BDGSparkCommand[BcftoolsArgs] with Logging { val companion = Bcftools val stringency: ValidationStringency = ValidationStringency.valueOf(args.stringency) def run(sc: SparkContext) { - val input: VariantContextRDD = sc.loadVcf(args.inputPath, stringency = stringency) - - implicit val tFormatter = VCFInFormatter - implicit val uFormatter = new VCFOutFormatter(sc.hadoopConfiguration) - - val bcftoolsCommand = if (args.useDocker) { - Seq("docker", - "run", - "--interactive", - "--rm", - args.dockerImage, - "bcftools", - "norm", - "--fasta-ref", - args.referencePath) - } else { - Seq(args.bcftoolsPath, - "norm", - "--fasta-ref", - args.referencePath) - } - - val output: VariantContextRDD = input.pipe[VariantContext, VariantContextRDD, VCFInFormatter](bcftoolsCommand) - .transform(_.cache()) - - output.saveAsVcf(args, stringency) + val variantContexts = sc.loadVcf(args.inputPath, stringency = stringency) + val pipedVariantContexts = new BcftoolsFn(args, sc).apply(variantContexts) + pipedVariantContexts.saveAsVcf(args, stringency) } } diff --git a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bedtools.scala b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bedtools.scala index 2c94d35..5b1165f 100644 --- a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bedtools.scala +++ b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bedtools.scala @@ -29,22 +29,10 @@ import org.bdgenomics.utils.cli._ import org.bdgenomics.utils.misc.Logging import org.kohsuke.args4j.{ Argument, Option => Args4jOption } -object Bedtools extends BDGCommandCompanion { - val commandName = "bedtools" - val commandDescription = "ADAM Pipe API wrapper for Bedtools intersect." - - def apply(cmdLine: Array[String]) = { - new Bedtools(Args4j[BedtoolsArgs](cmdLine)) - } -} - -class BedtoolsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { - @Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from.", index = 0) - var inputPath: String = null - - @Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to.", index = 1) - var outputPath: String = null - +/** + * Bedtools function arguments. + */ +class BedtoolsFnArgs extends Args4jBase { @Args4jOption(required = false, name = "-a", usage = "Bedtools intersect -a option. One of {-a,-b} should be left unspecified to accept piped input.") var a: String = null @@ -62,29 +50,42 @@ class BedtoolsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { @Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch Bedtools. If false, uses the Bedtools executable path.") var useDocker: Boolean = false - - @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.") - var asSingleFile: Boolean = false - - @Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.") - var disableFastConcat: Boolean = false - - @Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output.") - var deferMerging: Boolean = false - - // must be defined due to ADAMSaveAnyArgs, but unused here - var sortFastqOutput: Boolean = false } /** - * Bedtools. + * Bedtools wrapper as a function FeatureRDD → FeatureRDD, + * for use in cannoli-shell or notebooks. + * + * + * val args = new BedtoolsFnArgs() + * args.b = "foo.bed" + * args.useDocker = true + * val features = ... + * val pipedFeatures = new BedtoolsFn(args).apply(features) + * + * + * @param args Bedtools function arguments. + * @param files Files to make locally available to the commands being run. + * @param environment A map containing environment variable/value pairs to set + * in the environment for the newly created process. */ -class Bedtools(protected val args: BedtoolsArgs) extends BDGSparkCommand[BedtoolsArgs] with Logging { - val companion = Bedtools - - def run(sc: SparkContext) { - var features: FeatureRDD = sc.loadFeatures(args.inputPath) - +class BedtoolsFn( + val args: BedtoolsFnArgs, + val files: Seq[String], + val environment: Map[String, String]) extends Function1[FeatureRDD, FeatureRDD] with Logging { + + /** + * @param args Bedtools function arguments. + */ + def this(args: BedtoolsFnArgs) = this(args, Seq.empty, Map.empty) + + /** + * @param args Bedtools function arguments. + * @param files Files to make locally available to the commands being run. + */ + def this(args: BedtoolsFnArgs, files: Seq[String]) = this(args, files, Map.empty) + + override def apply(features: FeatureRDD): FeatureRDD = { val optA = Option(args.a) val optB = Option(args.b) require(optA.size + optB.size == 1, @@ -114,9 +115,56 @@ class Bedtools(protected val args: BedtoolsArgs) extends BDGSparkCommand[Bedtool ) } + log.info("Piping {} to bedtools with command: {} files: {} environment: {}", + Array(features, bedtoolsCommand, files, environment)) + implicit val tFormatter = BEDInFormatter implicit val uFormatter = new BEDOutFormatter - val pipedFeatures: FeatureRDD = features.pipe(bedtoolsCommand) + features.pipe(bedtoolsCommand, files, environment) + } +} + +object Bedtools extends BDGCommandCompanion { + val commandName = "bedtools" + val commandDescription = "ADAM Pipe API wrapper for Bedtools intersect." + + def apply(cmdLine: Array[String]) = { + new Bedtools(Args4j[BedtoolsArgs](cmdLine)) + } +} + +/** + * Bedtools command line arguments. + */ +class BedtoolsArgs extends BedtoolsFnArgs with ADAMSaveAnyArgs with ParquetArgs { + @Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from.", index = 0) + var inputPath: String = null + + @Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to.", index = 1) + var outputPath: String = null + + @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.") + var asSingleFile: Boolean = false + + @Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.") + var disableFastConcat: Boolean = false + + @Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output.") + var deferMerging: Boolean = false + + // must be defined due to ADAMSaveAnyArgs, but unused here + var sortFastqOutput: Boolean = false +} + +/** + * Bedtools command line wrapper. + */ +class Bedtools(protected val args: BedtoolsArgs) extends BDGSparkCommand[BedtoolsArgs] with Logging { + val companion = Bedtools + + override def run(sc: SparkContext) { + val features = sc.loadFeatures(args.inputPath) + val pipedFeatures = new BedtoolsFn(args).apply(features) pipedFeatures.save(args.outputPath, asSingleFile = args.asSingleFile, diff --git a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bowtie.scala b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bowtie.scala index 6dbaa1b..62c73f5 100644 --- a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bowtie.scala +++ b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bowtie.scala @@ -23,27 +23,14 @@ import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs import org.bdgenomics.adam.rdd.fragment.{ FragmentRDD, InterleavedFASTQInFormatter } import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, AnySAMOutFormatter } -import org.bdgenomics.formats.avro.AlignmentRecord import org.bdgenomics.utils.cli._ import org.bdgenomics.utils.misc.Logging import org.kohsuke.args4j.{ Argument, Option => Args4jOption } -object Bowtie extends BDGCommandCompanion { - val commandName = "bowtie" - val commandDescription = "ADAM Pipe API wrapper for Bowtie." - - def apply(cmdLine: Array[String]) = { - new Bowtie(Args4j[BowtieArgs](cmdLine)) - } -} - -class BowtieArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { - @Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from, in interleaved FASTQ format.", index = 0) - var inputPath: String = null - - @Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to.", index = 1) - var outputPath: String = null - +/** + * Bowtie function arguments. + */ +class BowtieFnArgs extends Args4jBase { @Args4jOption(required = false, name = "-bowtie_path", usage = "Path to the Bowtie executable. Defaults to bowtie.") var bowtiePath: String = "bowtie" @@ -55,35 +42,34 @@ class BowtieArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { @Args4jOption(required = true, name = "-bowtie_index", usage = "Basename of the bowtie index to be searched, e.g. in bowtie [options]* ...") var indexPath: String = null - - @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.") - var asSingleFile: Boolean = false - - @Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output.") - var deferMerging: Boolean = false - - @Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.") - var disableFastConcat: Boolean = false - - @Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT.") - var stringency: String = "STRICT" - - // must be defined due to ADAMSaveAnyArgs, but unused here - var sortFastqOutput: Boolean = false } /** - * Bowtie. + * Bowtie wrapper as a function FragmentRDD → AlignmentRecordRDD, + * for use in cannoli-shell or notebooks. + * + * @param args Bowtie function arguments. + * @param files Files to make locally available to the commands being run. + * @param environment A map containing environment variable/value pairs to set + * in the environment for the newly created process. */ -class Bowtie(protected val args: BowtieArgs) extends BDGSparkCommand[BowtieArgs] with Logging { - val companion = Bowtie - val stringency: ValidationStringency = ValidationStringency.valueOf(args.stringency) +class BowtieFn( + val args: BowtieFnArgs, + val files: Seq[String], + val environment: Map[String, String]) extends Function1[FragmentRDD, AlignmentRecordRDD] with Logging { - def run(sc: SparkContext) { - val input: FragmentRDD = sc.loadFragments(args.inputPath) + /** + * @param args Bowtie function arguments. + */ + def this(args: BowtieFnArgs) = this(args, Seq.empty, Map.empty) - implicit val tFormatter = InterleavedFASTQInFormatter - implicit val uFormatter = new AnySAMOutFormatter + /** + * @param args Bowtie function arguments. + * @param files Files to make locally available to the commands being run. + */ + def this(args: BowtieFnArgs, files: Seq[String]) = this(args, files, Map.empty) + + override def apply(fragments: FragmentRDD): AlignmentRecordRDD = { val bowtieCommand = if (args.useDocker) { Seq("docker", @@ -105,7 +91,62 @@ class Bowtie(protected val args: BowtieArgs) extends BDGSparkCommand[BowtieArgs] "-" ) } - val output: AlignmentRecordRDD = input.pipe[AlignmentRecord, AlignmentRecordRDD, InterleavedFASTQInFormatter](bowtieCommand) - output.save(args) + + log.info("Piping {} to bowtie with command: {} files: {} environment: {}", + Array(fragments, bowtieCommand, files, environment)) + + implicit val tFormatter = InterleavedFASTQInFormatter + implicit val uFormatter = new AnySAMOutFormatter + + fragments.pipe(bowtieCommand, files, environment) + } +} + +object Bowtie extends BDGCommandCompanion { + val commandName = "bowtie" + val commandDescription = "ADAM Pipe API wrapper for Bowtie." + + def apply(cmdLine: Array[String]) = { + new Bowtie(Args4j[BowtieArgs](cmdLine)) + } +} + +/** + * Bowtie command line arguments. + */ +class BowtieArgs extends BowtieFnArgs with ADAMSaveAnyArgs with ParquetArgs { + @Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from, in interleaved FASTQ format.", index = 0) + var inputPath: String = null + + @Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to.", index = 1) + var outputPath: String = null + + @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.") + var asSingleFile: Boolean = false + + @Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output.") + var deferMerging: Boolean = false + + @Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.") + var disableFastConcat: Boolean = false + + @Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT.") + var stringency: String = "STRICT" + + // must be defined due to ADAMSaveAnyArgs, but unused here + var sortFastqOutput: Boolean = false +} + +/** + * Bowtie command line wrapper. + */ +class Bowtie(protected val args: BowtieArgs) extends BDGSparkCommand[BowtieArgs] with Logging { + val companion = Bowtie + val stringency: ValidationStringency = ValidationStringency.valueOf(args.stringency) + + def run(sc: SparkContext) { + val fragments = sc.loadFragments(args.inputPath, stringency = stringency) + val alignments = new BowtieFn(args).apply(fragments) + alignments.save(args) } } diff --git a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bowtie2.scala b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bowtie2.scala index d6dd608..a631363 100644 --- a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bowtie2.scala +++ b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bowtie2.scala @@ -23,27 +23,14 @@ import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs import org.bdgenomics.adam.rdd.fragment.{ FragmentRDD, InterleavedFASTQInFormatter } import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, AnySAMOutFormatter } -import org.bdgenomics.formats.avro.AlignmentRecord import org.bdgenomics.utils.cli._ import org.bdgenomics.utils.misc.Logging import org.kohsuke.args4j.{ Argument, Option => Args4jOption } -object Bowtie2 extends BDGCommandCompanion { - val commandName = "bowtie2" - val commandDescription = "ADAM Pipe API wrapper for Bowtie 2." - - def apply(cmdLine: Array[String]) = { - new Bowtie2(Args4j[Bowtie2Args](cmdLine)) - } -} - -class Bowtie2Args extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { - @Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from, in interleaved FASTQ format.", index = 0) - var inputPath: String = null - - @Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to.", index = 1) - var outputPath: String = null - +/** + * Bowtie 2 function arguments. + */ +class Bowtie2FnArgs extends Args4jBase { @Args4jOption(required = false, name = "-bowtie2_path", usage = "Path to the Bowtie 2 executable. Defaults to bowtie2.") var bowtie2Path: String = "bowtie2" @@ -55,35 +42,34 @@ class Bowtie2Args extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { @Args4jOption(required = true, name = "-bowtie2_index", usage = "Basename of the index for the reference genome, e.g. in bowtie2 [options]* -x .") var indexPath: String = null - - @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.") - var asSingleFile: Boolean = false - - @Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output.") - var deferMerging: Boolean = false - - @Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.") - var disableFastConcat: Boolean = false - - @Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT.") - var stringency: String = "STRICT" - - // must be defined due to ADAMSaveAnyArgs, but unused here - var sortFastqOutput: Boolean = false } /** - * Bowtie 2. + * Bowtie 2 wrapper as a function FragmentRDD → AlignmentRecordRDD, + * for use in cannoli-shell or notebooks. + * + * @param args Bowtie 2 function arguments. + * @param files Files to make locally available to the commands being run. + * @param environment A map containing environment variable/value pairs to set + * in the environment for the newly created process. */ -class Bowtie2(protected val args: Bowtie2Args) extends BDGSparkCommand[Bowtie2Args] with Logging { - val companion = Bowtie2 - val stringency: ValidationStringency = ValidationStringency.valueOf(args.stringency) +class Bowtie2Fn( + val args: Bowtie2FnArgs, + val files: Seq[String], + val environment: Map[String, String]) extends Function1[FragmentRDD, AlignmentRecordRDD] with Logging { - def run(sc: SparkContext) { - val input: FragmentRDD = sc.loadFragments(args.inputPath) + /** + * @param args Bowtie 2 function arguments. + */ + def this(args: Bowtie2FnArgs) = this(args, Seq.empty, Map.empty) - implicit val tFormatter = InterleavedFASTQInFormatter - implicit val uFormatter = new AnySAMOutFormatter + /** + * @param args Bowtie 2 function arguments. + * @param files Files to make locally available to the commands being run. + */ + def this(args: Bowtie2FnArgs, files: Seq[String]) = this(args, files, Map.empty) + + override def apply(fragments: FragmentRDD): AlignmentRecordRDD = { val bowtie2Command = if (args.useDocker) { Seq("docker", @@ -105,7 +91,62 @@ class Bowtie2(protected val args: Bowtie2Args) extends BDGSparkCommand[Bowtie2Ar "-" ) } - val output: AlignmentRecordRDD = input.pipe[AlignmentRecord, AlignmentRecordRDD, InterleavedFASTQInFormatter](bowtie2Command) - output.save(args) + + log.info("Piping {} to bowtie2 with command: {} files: {} environment: {}", + Array(fragments, bowtie2Command, files, environment)) + + implicit val tFormatter = InterleavedFASTQInFormatter + implicit val uFormatter = new AnySAMOutFormatter + + fragments.pipe(bowtie2Command, files, environment) + } +} + +object Bowtie2 extends BDGCommandCompanion { + val commandName = "bowtie2" + val commandDescription = "ADAM Pipe API wrapper for Bowtie 2." + + def apply(cmdLine: Array[String]) = { + new Bowtie2(Args4j[Bowtie2Args](cmdLine)) + } +} + +/** + * Bowtie 2 command line arguments. + */ +class Bowtie2Args extends Bowtie2FnArgs with ADAMSaveAnyArgs with ParquetArgs { + @Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from, in interleaved FASTQ format.", index = 0) + var inputPath: String = null + + @Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to.", index = 1) + var outputPath: String = null + + @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.") + var asSingleFile: Boolean = false + + @Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output.") + var deferMerging: Boolean = false + + @Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.") + var disableFastConcat: Boolean = false + + @Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT.") + var stringency: String = "STRICT" + + // must be defined due to ADAMSaveAnyArgs, but unused here + var sortFastqOutput: Boolean = false +} + +/** + * Bowtie 2 command line wrapper. + */ +class Bowtie2(protected val args: Bowtie2Args) extends BDGSparkCommand[Bowtie2Args] with Logging { + val companion = Bowtie2 + val stringency: ValidationStringency = ValidationStringency.valueOf(args.stringency) + + def run(sc: SparkContext) { + val fragments = sc.loadFragments(args.inputPath, stringency = stringency) + val alignments = new Bowtie2Fn(args).apply(fragments) + alignments.save(args) } } diff --git a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bwa.scala b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bwa.scala index 666de11..734e9a3 100644 --- a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bwa.scala +++ b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Bwa.scala @@ -17,62 +17,32 @@ */ package org.bdgenomics.cannoli.cli +import htsjdk.samtools.ValidationStringency import org.apache.hadoop.fs.{ FileSystem, Path } import org.apache.spark.SparkContext -import org.bdgenomics.adam.models.{ - RecordGroup, - RecordGroupDictionary -} +import org.bdgenomics.adam.models.{ RecordGroup, RecordGroupDictionary } import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs import org.bdgenomics.adam.rdd.fragment.{ FragmentRDD, InterleavedFASTQInFormatter } import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, AnySAMOutFormatter } import org.bdgenomics.cannoli.util.QuerynameGrouper -import org.bdgenomics.formats.avro.AlignmentRecord import org.bdgenomics.utils.cli._ import org.bdgenomics.utils.misc.Logging import org.kohsuke.args4j.{ Argument, Option => Args4jOption } -object Bwa extends BDGCommandCompanion { - val commandName = "bwa" - val commandDescription = "ADAM Pipe API wrapper for BWA." - - def apply(cmdLine: Array[String]) = { - new Bwa(Args4j[BwaArgs](cmdLine)) - } -} - -class BwaArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { - @Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from, in interleaved FASTQ format.", index = 0) - var inputPath: String = null - - @Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to.", index = 1) - var outputPath: String = null - +/** + * Bwa function arguments. + */ +class BwaFnArgs extends Args4jBase { @Argument(required = true, metaVar = "SAMPLE", usage = "Sample ID.", index = 2) var sample: String = null @Args4jOption(required = true, name = "-index", usage = "Path to the bwa index to be searched, e.g. in bwa [options]* ...") var indexPath: String = null - @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file. Exclusive of -fragments.") - var asSingleFile: Boolean = false - - @Args4jOption(required = false, name = "-fragments", usage = "Saves OUTPUT as Fragments in Parquet. Exclusive of -single.") - var asFragments: Boolean = false - - @Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output.") - var deferMerging: Boolean = false - - @Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.") - var disableFastConcat: Boolean = false - @Args4jOption(required = false, name = "-bwa_path", usage = "Path to the BWA executable. Defaults to bwa.") var bwaPath: String = "bwa" - @Args4jOption(required = false, name = "-sequence_dictionary", usage = "Path to the sequence dictionary.") - var sequenceDictionary: String = _ - @Args4jOption(required = false, name = "-docker_image", usage = "Docker image to use. Defaults to quay.io/biocontainers/bwa:0.7.17--pl5.22.0_0.") var dockerImage: String = "quay.io/biocontainers/bwa:0.7.17--pl5.22.0_0" @@ -82,41 +52,40 @@ class BwaArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { @Args4jOption(required = false, name = "-docker_cmd", usage = "The docker command to run. Defaults to 'docker'.") var dockerCmd: String = "docker" - @Args4jOption(required = false, name = "-force_load_ifastq", usage = "Forces loading using interleaved FASTQ.") - var forceLoadIfastq: Boolean = false - - @Args4jOption(required = false, name = "-force_load_parquet", usage = "Forces loading using Parquet.") - var forceLoadParquet: Boolean = false - @Args4jOption(required = false, name = "-add_indices", usage = "Adds index files via SparkFiles mechanism.") var addIndices: Boolean = false - - // must be defined due to ADAMSaveAnyArgs, but unused here - var sortFastqOutput: Boolean = false } /** - * Bwa. + * Bwa wrapper as a function FragmentRDD → AlignmentRecordRDD, + * for use in cannoli-shell or notebooks. + * + * @param args Bwa function arguments. + * @param files Files to make locally available to the commands being run. + * @param environment A map containing environment variable/value pairs to set + * in the environment for the newly created process. + * @param sc Spark context. */ -class Bwa(protected val args: BwaArgs) extends BDGSparkCommand[BwaArgs] with Logging { - val companion = Bwa - - def run(sc: SparkContext) { - require(!(args.asSingleFile && args.asFragments), - "-single and -fragments are mutually exclusive.") - require(!(args.forceLoadIfastq && args.forceLoadParquet), - "-force_load_ifastq and -force_load_parquet are mutually exclusive.") - val input: FragmentRDD = if (args.forceLoadIfastq) { - sc.loadInterleavedFastqAsFragments(args.inputPath) - } else if (args.forceLoadParquet) { - sc.loadParquetFragments(args.inputPath) - } else { - sc.loadFragments(args.inputPath) - } - - implicit val tFormatter = InterleavedFASTQInFormatter - implicit val uFormatter = new AnySAMOutFormatter - +class BwaFn( + val args: BwaFnArgs, + val files: Seq[String], + val environment: Map[String, String], + val sc: SparkContext) extends Function1[FragmentRDD, AlignmentRecordRDD] with Logging { + + /** + * @param args Bwa function arguments. + * @param sc Spark context. + */ + def this(args: BwaFnArgs, sc: SparkContext) = this(args, Seq.empty, Map.empty, sc) + + /** + * @param args Bwa function arguments. + * @param files Files to make locally available to the commands being run. + * @param sc Spark context. + */ + def this(args: BwaFnArgs, files: Seq[String], sc: SparkContext) = this(args, files, Map.empty, sc) + + override def apply(fragments: FragmentRDD): AlignmentRecordRDD = { val sample = args.sample def getIndexPaths(fastaPath: String): Seq[String] = { @@ -193,7 +162,86 @@ class Bwa(protected val args: BwaArgs) extends BDGSparkCommand[BwaArgs] with Log "-")) } - val output: AlignmentRecordRDD = input.pipe[AlignmentRecord, AlignmentRecordRDD, InterleavedFASTQInFormatter](bwaCommand) + log.info("Piping {} to bwa with command: {} files: {} environment: {}", + Array(fragments, bwaCommand, files, environment)) + + implicit val tFormatter = InterleavedFASTQInFormatter + implicit val uFormatter = new AnySAMOutFormatter + + fragments.pipe(bwaCommand, files, environment) + } +} + +object Bwa extends BDGCommandCompanion { + val commandName = "bwa" + val commandDescription = "ADAM Pipe API wrapper for BWA." + + def apply(cmdLine: Array[String]) = { + new Bwa(Args4j[BwaArgs](cmdLine)) + } +} + +/** + * Bwa command line arguments. + */ +class BwaArgs extends BwaFnArgs with ADAMSaveAnyArgs with ParquetArgs { + @Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from, in interleaved FASTQ format.", index = 0) + var inputPath: String = null + + @Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to.", index = 1) + var outputPath: String = null + + @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file. Exclusive of -fragments.") + var asSingleFile: Boolean = false + + @Args4jOption(required = false, name = "-fragments", usage = "Saves OUTPUT as Fragments in Parquet. Exclusive of -single.") + var asFragments: Boolean = false + + @Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output.") + var deferMerging: Boolean = false + + @Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.") + var disableFastConcat: Boolean = false + + @Args4jOption(required = false, name = "-force_load_ifastq", usage = "Forces loading using interleaved FASTQ.") + var forceLoadIfastq: Boolean = false + + @Args4jOption(required = false, name = "-force_load_parquet", usage = "Forces loading using Parquet.") + var forceLoadParquet: Boolean = false + + @Args4jOption(required = false, name = "-sequence_dictionary", usage = "Path to the sequence dictionary.") + var sequenceDictionary: String = _ + + @Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT.") + var stringency: String = "STRICT" + + // must be defined due to ADAMSaveAnyArgs, but unused here + var sortFastqOutput: Boolean = false +} + +/** + * Bwa command line wrapper. + */ +class Bwa(protected val args: BwaArgs) extends BDGSparkCommand[BwaArgs] with Logging { + val companion = Bwa + val stringency: ValidationStringency = ValidationStringency.valueOf(args.stringency) + + def run(sc: SparkContext) { + require(!(args.asSingleFile && args.asFragments), + "-single and -fragments are mutually exclusive.") + require(!(args.forceLoadIfastq && args.forceLoadParquet), + "-force_load_ifastq and -force_load_parquet are mutually exclusive.") + val input: FragmentRDD = if (args.forceLoadIfastq) { + sc.loadInterleavedFastqAsFragments(args.inputPath) + } else if (args.forceLoadParquet) { + sc.loadParquetFragments(args.inputPath) + } else { + sc.loadFragments(args.inputPath, stringency = stringency) + } + + val sample = args.sample + + val output: AlignmentRecordRDD = new BwaFn(args, sc).apply(input) .replaceRecordGroups(RecordGroupDictionary(Seq(RecordGroup(sample, sample)))) val outputMaybeWithSequences = Option(args.sequenceDictionary).fold(output)(sdPath => { diff --git a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Freebayes.scala b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Freebayes.scala index 5e2ceeb..2e55c8d 100644 --- a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Freebayes.scala +++ b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Freebayes.scala @@ -31,6 +31,85 @@ import org.bdgenomics.utils.misc.Logging import org.kohsuke.args4j.{ Argument, Option => Args4jOption } import scala.collection.JavaConversions._ +/** + * Freebayes function arguments. + */ +class FreebayesFnArgs extends Args4jBase { + @Args4jOption(required = false, name = "-freebayes_path", usage = "Path to the Freebayes executable. Defaults to freebayes.") + var freebayesPath: String = "freebayes" + + @Args4jOption(required = true, name = "-freebayes_reference", usage = "Reference sequence for analysis. An index file (.fai) will be created if none exists.") + var referencePath: String = null + + @Args4jOption(required = false, name = "-docker_image", usage = "Docker image to use. Defaults to quay.io/biocontainers/freebayes:1.1.0.46--htslib1.6_2.") + var dockerImage: String = "quay.io/biocontainers/freebayes:1.1.0.46--htslib1.6_2" + + @Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch Freebayes. If false, uses the Freebayes executable path.") + var useDocker: Boolean = false +} + +/** + * Freebayes wrapper as a function AlignmentRecordRDD → VariantContextRDD, + * for use in cannoli-shell or notebooks. + * + * @param args Freebayes function arguments. + * @param files Files to make locally available to the commands being run. + * @param environment A map containing environment variable/value pairs to set + * in the environment for the newly created process. + * @param sc Spark context. + */ +class FreebayesFn( + val args: FreebayesFnArgs, + val files: Seq[String], + val environment: Map[String, String], + val sc: SparkContext) extends Function1[AlignmentRecordRDD, VariantContextRDD] with Logging { + + /** + * @param args Freebayes function arguments. + * @param sc Spark context. + */ + def this(args: FreebayesFnArgs, sc: SparkContext) = this(args, Seq.empty, Map.empty, sc) + + /** + * @param args Freebayes function arguments. + * @param files Files to make locally available to the commands being run. + * @param sc Spark context. + */ + def this(args: FreebayesFnArgs, files: Seq[String], sc: SparkContext) = this(args, files, Map.empty, sc) + + override def apply(alignments: AlignmentRecordRDD): VariantContextRDD = { + + val freebayesCommand = if (args.useDocker) { + Seq("docker", + "run", + "--rm", + args.dockerImage, + "freebayes", + "--fasta-reference", + args.referencePath, + "--stdin") + } else { + Seq(args.freebayesPath, + "--fasta-reference", + args.referencePath, + "--stdin") + } + + log.info("Piping {} to freebayes with command: {} files: {} environment: {}", + Array(alignments, freebayesCommand, files, environment)) + + val accumulator: CollectionAccumulator[VCFHeaderLine] = sc.collectionAccumulator("headerLines") + + implicit val tFormatter = BAMInFormatter + implicit val uFormatter = new VCFOutFormatter(sc.hadoopConfiguration, Some(accumulator)) + + val variantContexts = alignments.pipe[VariantContext, VariantContextRDD, BAMInFormatter](freebayesCommand, files, environment) + + val headerLines = accumulator.value.distinct + variantContexts.replaceHeaderLines(headerLines) + } +} + object Freebayes extends BDGCommandCompanion { val commandName = "freebayes" val commandDescription = "ADAM Pipe API wrapper for Freebayes." @@ -40,25 +119,16 @@ object Freebayes extends BDGCommandCompanion { } } -class FreebayesArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { +/** + * Freebayes command line arguments. + */ +class FreebayesArgs extends FreebayesFnArgs with ADAMSaveAnyArgs with ParquetArgs { @Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from.", index = 0) var inputPath: String = null @Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to, in VCF format.", index = 1) var outputPath: String = null - @Args4jOption(required = false, name = "-freebayes_path", usage = "Path to the Freebayes executable. Defaults to freebayes.") - var freebayesPath: String = "freebayes" - - @Args4jOption(required = true, name = "-freebayes_reference", usage = "Reference sequence for analysis. An index file (.fai) will be created if none exists.") - var referencePath: String = null - - @Args4jOption(required = false, name = "-docker_image", usage = "Docker image to use. Defaults to quay.io/biocontainers/freebayes:1.1.0.46--htslib1.6_2.") - var dockerImage: String = "quay.io/biocontainers/freebayes:1.1.0.46--htslib1.6_2" - - @Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch Freebayes. If false, uses the Freebayes executable path.") - var useDocker: Boolean = false - @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.") var asSingleFile: Boolean = false @@ -76,42 +146,15 @@ class FreebayesArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { } /** - * Freebayes. + * Freebayes command line wrapper. */ class Freebayes(protected val args: FreebayesArgs) extends BDGSparkCommand[FreebayesArgs] with Logging { val companion = Freebayes val stringency: ValidationStringency = ValidationStringency.valueOf(args.stringency) def run(sc: SparkContext) { - val input: AlignmentRecordRDD = sc.loadAlignments(args.inputPath, stringency = stringency) - - val accumulator: CollectionAccumulator[VCFHeaderLine] = sc.collectionAccumulator("headerLines") - - implicit val tFormatter = BAMInFormatter - implicit val uFormatter = new VCFOutFormatter(sc.hadoopConfiguration, Some(accumulator)) - - val freebayesCommand = if (args.useDocker) { - Seq("docker", - "run", - "--rm", - args.dockerImage, - "freebayes", - "--fasta-reference", - args.referencePath, - "--stdin") - } else { - Seq(args.freebayesPath, - "--fasta-reference", - args.referencePath, - "--stdin") - } - - val output: VariantContextRDD = input.pipe[VariantContext, VariantContextRDD, BAMInFormatter](freebayesCommand) - .transform(_.cache()) - - val headerLines = accumulator.value.distinct - val updatedHeaders = output.replaceHeaderLines(headerLines) - - updatedHeaders.saveAsVcf(args, stringency) + val alignments = sc.loadAlignments(args.inputPath, stringency = stringency) + val variantContexts = new FreebayesFn(args, sc).apply(alignments) + variantContexts.saveAsVcf(args, stringency) } } diff --git a/cli/src/main/scala/org/bdgenomics/cannoli/cli/SampleReads.scala b/cli/src/main/scala/org/bdgenomics/cannoli/cli/SampleReads.scala index aeda552..3db8127 100644 --- a/cli/src/main/scala/org/bdgenomics/cannoli/cli/SampleReads.scala +++ b/cli/src/main/scala/org/bdgenomics/cannoli/cli/SampleReads.scala @@ -19,13 +19,9 @@ package org.bdgenomics.cannoli.cli import htsjdk.samtools.ValidationStringency import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs -import org.bdgenomics.adam.rdd.fragment.{ FragmentRDD, InterleavedFASTQInFormatter } -import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, AnySAMOutFormatter } -import org.bdgenomics.formats.avro.AlignmentRecord +import org.bdgenomics.adam.rdd.fragment.FragmentRDD import org.bdgenomics.utils.cli._ import org.bdgenomics.utils.misc.Logging import org.kohsuke.args4j.{ Argument, Option => Args4jOption } @@ -67,10 +63,10 @@ class SampleReadsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { */ class SampleReads(protected val args: SampleReadsArgs) extends BDGSparkCommand[SampleReadsArgs] with Logging { val companion = SampleReads - val stringency = ValidationStringency.valueOf(args.stringency) + val stringency: ValidationStringency = ValidationStringency.valueOf(args.stringency) def run(sc: SparkContext) { - val fragments: FragmentRDD = sc.loadFragments(args.inputPath) + val fragments: FragmentRDD = sc.loadFragments(args.inputPath, stringency = stringency) log.info("Sampling fraction %f with seed %d".format(args.fraction, args.seed)) fragments diff --git a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Samtools.scala b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Samtools.scala index b596ed8..9152e58 100644 --- a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Samtools.scala +++ b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Samtools.scala @@ -28,22 +28,10 @@ import org.bdgenomics.utils.cli._ import org.bdgenomics.utils.misc.Logging import org.kohsuke.args4j.{ Argument, Option => Args4jOption } -object Samtools extends BDGCommandCompanion { - val commandName = "samtools" - val commandDescription = "ADAM Pipe API wrapper for samtools mpileup." - - def apply(cmdLine: Array[String]) = { - new Samtools(Args4j[SamtoolsArgs](cmdLine)) - } -} - -class SamtoolsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { - @Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from.", index = 0) - var inputPath: String = null - - @Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to, in VCF format.", index = 1) - var outputPath: String = null - +/** + * Samtools function arguments. + */ +class SamtoolsFnArgs extends Args4jBase { @Args4jOption(required = false, name = "-samtools_path", usage = "Path to the samtools executable. Defaults to samtools.") var samtoolsPath: String = "samtools" @@ -55,35 +43,38 @@ class SamtoolsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { @Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch samtools. If false, uses the samtools executable path.") var useDocker: Boolean = false - - @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.") - var asSingleFile: Boolean = false - - @Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output.") - var deferMerging: Boolean = false - - @Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.") - var disableFastConcat: Boolean = false - - @Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT.") - var stringency: String = "STRICT" - - // must be defined due to ADAMSaveAnyArgs, but unused here - var sortFastqOutput: Boolean = false } /** - * Samtools. + * Samtools wrapper as a function AlignmentRecordRDD → VariantContextRDD, + * for use in cannoli-shell or notebooks. + * + * @param args Samtools function arguments. + * @param files Files to make locally available to the commands being run. + * @param environment A map containing environment variable/value pairs to set + * in the environment for the newly created process. + * @param sc Spark context. */ -class Samtools(protected val args: SamtoolsArgs) extends BDGSparkCommand[SamtoolsArgs] with Logging { - val companion = Samtools - val stringency: ValidationStringency = ValidationStringency.valueOf(args.stringency) - - def run(sc: SparkContext) { - val input: AlignmentRecordRDD = sc.loadAlignments(args.inputPath, stringency = stringency) - - implicit val tFormatter = BAMInFormatter - implicit val uFormatter = new VCFOutFormatter(sc.hadoopConfiguration) +class SamtoolsFn( + val args: SamtoolsFnArgs, + val files: Seq[String], + val environment: Map[String, String], + val sc: SparkContext) extends Function1[AlignmentRecordRDD, VariantContextRDD] with Logging { + + /** + * @param args Samtools function arguments. + * @param sc Spark context. + */ + def this(args: SamtoolsFnArgs, sc: SparkContext) = this(args, Seq.empty, Map.empty, sc) + + /** + * @param args Samtools function arguments. + * @param files Files to make locally available to the commands being run. + * @param sc Spark context. + */ + def this(args: SamtoolsFnArgs, files: Seq[String], sc: SparkContext) = this(args, files, Map.empty, sc) + + override def apply(alignments: AlignmentRecordRDD): VariantContextRDD = { val samtoolsCommand = if (args.useDocker) { Seq("docker", @@ -108,9 +99,61 @@ class Samtools(protected val args: SamtoolsArgs) extends BDGSparkCommand[Samtool "-u") } - val output: VariantContextRDD = input.pipe[VariantContext, VariantContextRDD, BAMInFormatter](samtoolsCommand) - .transform(_.cache()) + log.info("Piping {} to samtools with command: {} files: {} environment: {}", + Array(alignments, samtoolsCommand, files, environment)) + + implicit val tFormatter = BAMInFormatter + implicit val uFormatter = new VCFOutFormatter(sc.hadoopConfiguration) - output.saveAsVcf(args, stringency) + alignments.pipe[VariantContext, VariantContextRDD, BAMInFormatter](samtoolsCommand, files, environment) + } +} + +object Samtools extends BDGCommandCompanion { + val commandName = "samtools" + val commandDescription = "ADAM Pipe API wrapper for samtools mpileup." + + def apply(cmdLine: Array[String]) = { + new Samtools(Args4j[SamtoolsArgs](cmdLine)) + } +} + +/** + * Samtools command line arguments. + */ +class SamtoolsArgs extends SamtoolsFnArgs with ADAMSaveAnyArgs with ParquetArgs { + @Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from.", index = 0) + var inputPath: String = null + + @Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to, in VCF format.", index = 1) + var outputPath: String = null + + @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.") + var asSingleFile: Boolean = false + + @Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output.") + var deferMerging: Boolean = false + + @Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.") + var disableFastConcat: Boolean = false + + @Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT.") + var stringency: String = "STRICT" + + // must be defined due to ADAMSaveAnyArgs, but unused here + var sortFastqOutput: Boolean = false +} + +/** + * Samtools command line wrapper. + */ +class Samtools(protected val args: SamtoolsArgs) extends BDGSparkCommand[SamtoolsArgs] with Logging { + val companion = Samtools + val stringency: ValidationStringency = ValidationStringency.valueOf(args.stringency) + + def run(sc: SparkContext) { + val alignments = sc.loadAlignments(args.inputPath, stringency = stringency) + val variantContexts = new SamtoolsFn(args, sc).apply(alignments) + variantContexts.saveAsVcf(args, stringency) } } diff --git a/cli/src/main/scala/org/bdgenomics/cannoli/cli/SnpEff.scala b/cli/src/main/scala/org/bdgenomics/cannoli/cli/SnpEff.scala index a03cc87..1875d72 100644 --- a/cli/src/main/scala/org/bdgenomics/cannoli/cli/SnpEff.scala +++ b/cli/src/main/scala/org/bdgenomics/cannoli/cli/SnpEff.scala @@ -19,7 +19,6 @@ package org.bdgenomics.cannoli.cli import htsjdk.samtools.ValidationStringency import org.apache.spark.SparkContext -import org.bdgenomics.adam.models.VariantContext import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs import org.bdgenomics.adam.rdd.variant.{ @@ -31,6 +30,78 @@ import org.bdgenomics.utils.cli._ import org.bdgenomics.utils.misc.Logging import org.kohsuke.args4j.{ Argument, Option => Args4jOption } +/** + * SnpEff function arguments. + */ +class SnpEffFnArgs extends Args4jBase { + @Args4jOption(required = false, name = "-database", usage = "SnpEff database name. Defaults to GRCh38.86.") + var snpEffDatabase: String = "GRCh38.86" + + @Args4jOption(required = false, name = "-snpeff_path", usage = "Path to the SnpEff executable. Defaults to snpEff.") + var snpEffPath: String = "snpEff" + + @Args4jOption(required = false, name = "-docker_image", usage = "Docker image to use. Defaults to quay.io/biocontainers/snpeff:4.3.1t--0.") + var dockerImage: String = "quay.io/biocontainers/snpeff:4.3.1t--0" + + @Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch SnpEff. If false, uses the SnpEff executable path.") + var useDocker: Boolean = false +} + +/** + * SnpEff wrapper as a function VariantContextRDD → VariantContextRDD, + * for use in cannoli-shell or notebooks. + * + * @param args SnpEff function arguments. + * @param files Files to make locally available to the commands being run. + * @param environment A map containing environment variable/value pairs to set + * in the environment for the newly created process. + * @param sc Spark context. + */ +class SnpEffFn( + val args: SnpEffFnArgs, + val files: Seq[String], + val environment: Map[String, String], + val sc: SparkContext) extends Function1[VariantContextRDD, VariantContextRDD] with Logging { + + /** + * @param args SnpEff function arguments. + * @param sc Spark context. + */ + def this(args: SnpEffFnArgs, sc: SparkContext) = this(args, Seq.empty, Map.empty, sc) + + /** + * @param args SnpEff function arguments. + * @param files Files to make locally available to the commands being run. + * @param sc Spark context. + */ + def this(args: SnpEffFnArgs, files: Seq[String], sc: SparkContext) = this(args, files, Map.empty, sc) + + override def apply(variantContexts: VariantContextRDD): VariantContextRDD = { + + val snpEffCommand = if (args.useDocker) { + Seq("docker", + "run", + "--rm", + args.dockerImage, + "snpEff", + "-download", + args.snpEffDatabase) + } else { + Seq(args.snpEffPath, + "-download", + args.snpEffDatabase) + } + + log.info("Piping {} to snpEff with command: {} files: {} environment: {}", + Array(variantContexts, snpEffCommand, files, environment)) + + implicit val tFormatter = VCFInFormatter + implicit val uFormatter = new VCFOutFormatter(sc.hadoopConfiguration) + + variantContexts.pipe(snpEffCommand, files, environment) + } +} + object SnpEff extends BDGCommandCompanion { val commandName = "snpEff" val commandDescription = "ADAM Pipe API wrapper for SnpEff." @@ -40,25 +111,16 @@ object SnpEff extends BDGCommandCompanion { } } -class SnpEffArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { +/** + * SnpEff command line arguments. + */ +class SnpEffArgs extends SnpEffFnArgs with ADAMSaveAnyArgs with ParquetArgs { @Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from, in VCF format.", index = 0) var inputPath: String = null @Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to, in VCF format.", index = 1) var outputPath: String = null - @Args4jOption(required = false, name = "-database", usage = "SnpEff database name. Defaults to GRCh38.86.") - var snpEffDatabase: String = "GRCh38.86" - - @Args4jOption(required = false, name = "-snpeff_path", usage = "Path to the SnpEff executable. Defaults to snpEff.") - var snpEffPath: String = "snpEff" - - @Args4jOption(required = false, name = "-docker_image", usage = "Docker image to use. Defaults to quay.io/biocontainers/snpeff:4.3.1t--0.") - var dockerImage: String = "quay.io/biocontainers/snpeff:4.3.1t--0" - - @Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch SnpEff. If false, uses the SnpEff executable path.") - var useDocker: Boolean = false - @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.") var asSingleFile: Boolean = false @@ -76,35 +138,15 @@ class SnpEffArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { } /** - * SnpEff. + * SnpEff command line wrapper. */ class SnpEff(protected val args: SnpEffArgs) extends BDGSparkCommand[SnpEffArgs] with Logging { val companion = SnpEff val stringency: ValidationStringency = ValidationStringency.valueOf(args.stringency) def run(sc: SparkContext) { - val input: VariantContextRDD = sc.loadVcf(args.inputPath, stringency) - - implicit val tFormatter = VCFInFormatter - implicit val uFormatter = new VCFOutFormatter(sc.hadoopConfiguration) - - val snpEffCommand = if (args.useDocker) { - Seq("docker", - "run", - "--rm", - args.dockerImage, - "snpEff", - "-download", - args.snpEffDatabase) - } else { - Seq(args.snpEffPath, - "-download", - args.snpEffDatabase) - } - - val output: VariantContextRDD = input.pipe[VariantContext, VariantContextRDD, VCFInFormatter](snpEffCommand) - .transform(_.cache()) - - output.saveAsVcf(args, stringency) + val variantContexts = sc.loadVcf(args.inputPath, stringency = stringency) + val pipedVariantContexts = new SnpEffFn(args, sc).apply(variantContexts) + pipedVariantContexts.saveAsVcf(args, stringency) } } diff --git a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Vt.scala b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Vt.scala index e38cb3d..cafb057 100644 --- a/cli/src/main/scala/org/bdgenomics/cannoli/cli/Vt.scala +++ b/cli/src/main/scala/org/bdgenomics/cannoli/cli/Vt.scala @@ -19,7 +19,6 @@ package org.bdgenomics.cannoli.cli import htsjdk.samtools.ValidationStringency import org.apache.spark.SparkContext -import org.bdgenomics.adam.models.VariantContext import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs import org.bdgenomics.adam.rdd.variant.{ @@ -31,6 +30,83 @@ import org.bdgenomics.utils.cli._ import org.bdgenomics.utils.misc.Logging import org.kohsuke.args4j.{ Argument, Option => Args4jOption } +/** + * Vt function arguments. + */ +class VtFnArgs extends Args4jBase { + @Args4jOption(required = false, name = "-vt_path", usage = "Path to the vt executable. Defaults to vt.") + var vtPath: String = "vt" + + @Args4jOption(required = true, name = "-vt_reference", usage = "Reference sequence for analysis.") + var referencePath: String = null + + @Args4jOption(required = false, name = "-docker_image", usage = "Docker image to use. Defaults to heuermh/vt.") + var dockerImage: String = "heuermh/vt" + + @Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch vt. If false, uses the vt executable path.") + var useDocker: Boolean = false +} + +/** + * Vt wrapper as a function VariantContextRDD → VariantContextRDD, + * for use in cannoli-shell or notebooks. + * + * @param args Vt function arguments. + * @param files Files to make locally available to the commands being run. + * @param environment A map containing environment variable/value pairs to set + * in the environment for the newly created process. + * @param sc Spark context. + */ +class VtFn( + val args: VtFnArgs, + val files: Seq[String], + val environment: Map[String, String], + val sc: SparkContext) extends Function1[VariantContextRDD, VariantContextRDD] with Logging { + + /** + * @param args Vt function arguments. + * @param sc Spark context. + */ + def this(args: VtFnArgs, sc: SparkContext) = this(args, Seq.empty, Map.empty, sc) + + /** + * @param args Vt function arguments. + * @param files Files to make locally available to the commands being run. + * @param sc Spark context. + */ + def this(args: VtFnArgs, files: Seq[String], sc: SparkContext) = this(args, files, Map.empty, sc) + + override def apply(variantContexts: VariantContextRDD): VariantContextRDD = { + + val vtCommand = if (args.useDocker) { + Seq("docker", + "run", + "--interactive", + "--rm", + args.dockerImage, + "vt", + "normalize", + "-", + "-r", + args.referencePath) + } else { + Seq(args.vtPath, + "normalize", + "-", + "-r", + args.referencePath) + } + + log.info("Piping {} to vt with command: {} files: {} environment: {}", + Array(variantContexts, vtCommand, files, environment)) + + implicit val tFormatter = VCFInFormatter + implicit val uFormatter = new VCFOutFormatter(sc.hadoopConfiguration) + + variantContexts.pipe(vtCommand, files, environment) + } +} + object Vt extends BDGCommandCompanion { val commandName = "vt" val commandDescription = "ADAM Pipe API wrapper for vt normalize." @@ -40,25 +116,16 @@ object Vt extends BDGCommandCompanion { } } -class VtArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { +/** + * Vt command line arguments. + */ +class VtArgs extends VtFnArgs with ADAMSaveAnyArgs with ParquetArgs { @Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from, in VCF format.", index = 0) var inputPath: String = null @Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to, in VCF format.", index = 1) var outputPath: String = null - @Args4jOption(required = false, name = "-vt_path", usage = "Path to the vt executable. Defaults to vt.") - var vtPath: String = "vt" - - @Args4jOption(required = true, name = "-vt_reference", usage = "Reference sequence for analysis.") - var referencePath: String = null - - @Args4jOption(required = false, name = "-docker_image", usage = "Docker image to use. Defaults to heuermh/vt.") - var dockerImage: String = "heuermh/vt" - - @Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch vt. If false, uses the vt executable path.") - var useDocker: Boolean = false - @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.") var asSingleFile: Boolean = false @@ -76,38 +143,15 @@ class VtArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { } /** - * Vt. + * Vt command line wrapper. */ class Vt(protected val args: VtArgs) extends BDGSparkCommand[VtArgs] with Logging { val companion = Vt val stringency: ValidationStringency = ValidationStringency.valueOf(args.stringency) def run(sc: SparkContext) { - val input = sc.loadVcf(args.inputPath, stringency = stringency) - - implicit val tFormatter = VCFInFormatter - implicit val uFormatter = new VCFOutFormatter(sc.hadoopConfiguration) - - val vtCommand = if (args.useDocker) { - Seq("docker", - "run", - "--interactive", - "--rm", - args.dockerImage, - "vt", - "normalize", - "-", - "-r", - args.referencePath) - } else { - Seq(args.vtPath, - "normalize", - "-", - "-r", - args.referencePath) - } - - val output: VariantContextRDD = input.pipe[VariantContext, VariantContextRDD, VCFInFormatter](vtCommand) - output.saveAsVcf(args, stringency) + val variantContexts = sc.loadVcf(args.inputPath, stringency = stringency) + val pipedVariantContexts = new VtFn(args, sc).apply(variantContexts) + pipedVariantContexts.saveAsVcf(args, stringency) } }