Skip to content

Commit

Permalink
Refactor cli commands into functions for use in cannoli-shell or note…
Browse files Browse the repository at this point in the history
…books.
  • Loading branch information
heuermh committed Feb 27, 2018
1 parent 23a4585 commit 0ab8543
Show file tree
Hide file tree
Showing 10 changed files with 789 additions and 401 deletions.
124 changes: 83 additions & 41 deletions cli/src/main/scala/org/bdgenomics/cannoli/cli/Bcftools.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.bdgenomics.cannoli.cli

import htsjdk.samtools.ValidationStringency
import org.apache.spark.SparkContext
import org.bdgenomics.adam.models.VariantContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.variant.{
Expand All @@ -31,6 +30,81 @@ import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }

/**
* Bcftools function arguments.
*/
class BcftoolsFnArgs extends Args4jBase {
@Args4jOption(required = false, name = "-bcftools_path", usage = "Path to the BCFtools executable. Defaults to bcftools.")
var bcftoolsPath: String = "bcftools"

@Args4jOption(required = true, name = "-bcftools_reference", usage = "Reference sequence for analysis. An index file (.fai) will be created if none exists.")
var referencePath: String = null

@Args4jOption(required = false, name = "-docker_image", usage = "Docker image to use. Defaults to quay.io/biocontainers/bcftools:1.6--0.")
var dockerImage: String = "quay.io/biocontainers/bcftools:1.6--0"

@Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch BCFtools. If false, uses the BCFtools executable path.")
var useDocker: Boolean = false
}

/**
* Bcftools wrapper as a function VariantContextRDD → VariantContextRDD,
* for use in cannoli-shell or notebooks.
*
* @param args Bcftools function arguments.
* @param files Files to make locally available to the commands being run.
* @param environment A map containing environment variable/value pairs to set
* in the environment for the newly created process.
* @param sc Spark context.
*/
class BcftoolsFn(
val args: BcftoolsFnArgs,
val files: Seq[String],
val environment: Map[String, String],
val sc: SparkContext) extends Function1[VariantContextRDD, VariantContextRDD] with Logging {

/**
* @param args Bcftools function arguments.
* @param sc Spark context.
*/
def this(args: BcftoolsFnArgs, sc: SparkContext) = this(args, Seq.empty, Map.empty, sc)

/**
* @param args Bcftools function arguments.
* @param files Files to make locally available to the commands being run.
* @param sc Spark context.
*/
def this(args: BcftoolsFnArgs, files: Seq[String], sc: SparkContext) = this(args, files, Map.empty, sc)

override def apply(variantContexts: VariantContextRDD): VariantContextRDD = {

val bcftoolsCommand = if (args.useDocker) {
Seq("docker",
"run",
"--interactive",
"--rm",
args.dockerImage,
"bcftools",
"norm",
"--fasta-ref",
args.referencePath)
} else {
Seq(args.bcftoolsPath,
"norm",
"--fasta-ref",
args.referencePath)
}

log.info("Piping {} to bcftools with command: {} files: {} environment: {}",
Array(variantContexts, bcftoolsCommand, files, environment))

implicit val tFormatter = VCFInFormatter
implicit val uFormatter = new VCFOutFormatter(sc.hadoopConfiguration)

variantContexts.pipe(bcftoolsCommand, files, environment)
}
}

object Bcftools extends BDGCommandCompanion {
val commandName = "bcftools"
val commandDescription = "ADAM Pipe API wrapper for BCFtools norm."
Expand All @@ -40,25 +114,16 @@ object Bcftools extends BDGCommandCompanion {
}
}

class BcftoolsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
/**
* Bcftools command line arguments.
*/
class BcftoolsArgs extends BcftoolsFnArgs with ADAMSaveAnyArgs with ParquetArgs {
@Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from, in VCF format.", index = 0)
var inputPath: String = null

@Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to, in VCF format.", index = 1)
var outputPath: String = null

@Args4jOption(required = false, name = "-bcftools_path", usage = "Path to the BCFtools executable. Defaults to bcftools.")
var bcftoolsPath: String = "bcftools"

@Args4jOption(required = true, name = "-bcftools_reference", usage = "Reference sequence for analysis. An index file (.fai) will be created if none exists.")
var referencePath: String = null

@Args4jOption(required = false, name = "-docker_image", usage = "Docker image to use. Defaults to quay.io/biocontainers/bcftools:1.6--0.")
var dockerImage: String = "quay.io/biocontainers/bcftools:1.6--0"

@Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch BCFtools. If false, uses the BCFtools executable path.")
var useDocker: Boolean = false

@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.")
var asSingleFile: Boolean = false

Expand All @@ -76,38 +141,15 @@ class BcftoolsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
}

/**
* Bcftools.
* Bcftools command line wrapper.
*/
class Bcftools(protected val args: BcftoolsArgs) extends BDGSparkCommand[BcftoolsArgs] with Logging {
val companion = Bcftools
val stringency: ValidationStringency = ValidationStringency.valueOf(args.stringency)

def run(sc: SparkContext) {
val input: VariantContextRDD = sc.loadVcf(args.inputPath, stringency = stringency)

implicit val tFormatter = VCFInFormatter
implicit val uFormatter = new VCFOutFormatter(sc.hadoopConfiguration)

val bcftoolsCommand = if (args.useDocker) {
Seq("docker",
"run",
"--interactive",
"--rm",
args.dockerImage,
"bcftools",
"norm",
"--fasta-ref",
args.referencePath)
} else {
Seq(args.bcftoolsPath,
"norm",
"--fasta-ref",
args.referencePath)
}

val output: VariantContextRDD = input.pipe[VariantContext, VariantContextRDD, VCFInFormatter](bcftoolsCommand)
.transform(_.cache())

output.saveAsVcf(args, stringency)
val variantContexts = sc.loadVcf(args.inputPath, stringency = stringency)
val pipedVariantContexts = new BcftoolsFn(args, sc).apply(variantContexts)
pipedVariantContexts.saveAsVcf(args, stringency)
}
}
120 changes: 84 additions & 36 deletions cli/src/main/scala/org/bdgenomics/cannoli/cli/Bedtools.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,10 @@ import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }

object Bedtools extends BDGCommandCompanion {
val commandName = "bedtools"
val commandDescription = "ADAM Pipe API wrapper for Bedtools intersect."

def apply(cmdLine: Array[String]) = {
new Bedtools(Args4j[BedtoolsArgs](cmdLine))
}
}

class BedtoolsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
@Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from.", index = 0)
var inputPath: String = null

@Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to.", index = 1)
var outputPath: String = null

/**
* Bedtools function arguments.
*/
class BedtoolsFnArgs extends Args4jBase {
@Args4jOption(required = false, name = "-a", usage = "Bedtools intersect -a option. One of {-a,-b} should be left unspecified to accept piped input.")
var a: String = null

Expand All @@ -62,29 +50,42 @@ class BedtoolsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {

@Args4jOption(required = false, name = "-use_docker", usage = "If true, uses Docker to launch Bedtools. If false, uses the Bedtools executable path.")
var useDocker: Boolean = false

@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.")
var asSingleFile: Boolean = false

@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

@Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output.")
var deferMerging: Boolean = false

// must be defined due to ADAMSaveAnyArgs, but unused here
var sortFastqOutput: Boolean = false
}

/**
* Bedtools.
* Bedtools wrapper as a function FeatureRDD → FeatureRDD,
* for use in cannoli-shell or notebooks.
*
* <code>
* val args = new BedtoolsFnArgs()
* args.b = "foo.bed"
* args.useDocker = true
* val features = ...
* val pipedFeatures = new BedtoolsFn(args).apply(features)
* </code>
*
* @param args Bedtools function arguments.
* @param files Files to make locally available to the commands being run.
* @param environment A map containing environment variable/value pairs to set
* in the environment for the newly created process.
*/
class Bedtools(protected val args: BedtoolsArgs) extends BDGSparkCommand[BedtoolsArgs] with Logging {
val companion = Bedtools

def run(sc: SparkContext) {
var features: FeatureRDD = sc.loadFeatures(args.inputPath)

class BedtoolsFn(
val args: BedtoolsFnArgs,
val files: Seq[String],
val environment: Map[String, String]) extends Function1[FeatureRDD, FeatureRDD] with Logging {

/**
* @param args Bedtools function arguments.
*/
def this(args: BedtoolsFnArgs) = this(args, Seq.empty, Map.empty)

/**
* @param args Bedtools function arguments.
* @param files Files to make locally available to the commands being run.
*/
def this(args: BedtoolsFnArgs, files: Seq[String]) = this(args, files, Map.empty)

override def apply(features: FeatureRDD): FeatureRDD = {
val optA = Option(args.a)
val optB = Option(args.b)
require(optA.size + optB.size == 1,
Expand Down Expand Up @@ -114,9 +115,56 @@ class Bedtools(protected val args: BedtoolsArgs) extends BDGSparkCommand[Bedtool
)
}

log.info("Piping {} to bedtools with command: {} files: {} environment: {}",
Array(features, bedtoolsCommand, files, environment))

implicit val tFormatter = BEDInFormatter
implicit val uFormatter = new BEDOutFormatter
val pipedFeatures: FeatureRDD = features.pipe(bedtoolsCommand)
features.pipe(bedtoolsCommand, files, environment)
}
}

object Bedtools extends BDGCommandCompanion {
val commandName = "bedtools"
val commandDescription = "ADAM Pipe API wrapper for Bedtools intersect."

def apply(cmdLine: Array[String]) = {
new Bedtools(Args4j[BedtoolsArgs](cmdLine))
}
}

/**
* Bedtools command line arguments.
*/
class BedtoolsArgs extends BedtoolsFnArgs with ADAMSaveAnyArgs with ParquetArgs {
@Argument(required = true, metaVar = "INPUT", usage = "Location to pipe from.", index = 0)
var inputPath: String = null

@Argument(required = true, metaVar = "OUTPUT", usage = "Location to pipe to.", index = 1)
var outputPath: String = null

@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file.")
var asSingleFile: Boolean = false

@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

@Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output.")
var deferMerging: Boolean = false

// must be defined due to ADAMSaveAnyArgs, but unused here
var sortFastqOutput: Boolean = false
}

/**
* Bedtools command line wrapper.
*/
class Bedtools(protected val args: BedtoolsArgs) extends BDGSparkCommand[BedtoolsArgs] with Logging {
val companion = Bedtools

override def run(sc: SparkContext) {
val features = sc.loadFeatures(args.inputPath)
val pipedFeatures = new BedtoolsFn(args).apply(features)

pipedFeatures.save(args.outputPath,
asSingleFile = args.asSingleFile,
Expand Down
Loading

0 comments on commit 0ab8543

Please sign in to comment.