Skip to content
/ dap2 Public

Data processing pipeline as DAG in python

Notifications You must be signed in to change notification settings

mfz/dap2

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

11 Commits
 
 
 
 
 
 

Repository files navigation

dap2 - (yet another) data analysis pipeline

  • a data processin pipeline is a DAG of processes linking input files to ouptput files

    input1, input2 -> processA -> output1, output2

  • each process executes shell commands

Why yet another data analysis pipeline?

  • make
    • does not support cluster well
    • sick of creating rp databases
  • snakemake
    • does (did) not scale well to 1000s of jobs
    • not designed as a library
    • defines new syntax (instead of using decorators),

What does dap2 provide?

  • important make functionality
    • but better parameterization of rules
    • supports local and cluster jobs
  • provenance
    • can trace back all commands used to create any output file
  • can be used as library to submit shell commands or python functions from python
  • scales to 100, 000s of jobs
  • single python file with approx 1,000 lines (code + documentation)

DAG creation

  • implemented as (bipartite) DAG with Process and File nodes
    class ProcessNode(object):
    
        def __init__(self, cmd, parents = [], children = [], name = 'anonymous', **params):
            """
            create ProcessNode
            parents and children can be lists or dicts
            """
            self.cmd = cmd
            self.parents = [File(p) for p in (parents.values() if type(parents)==dict else parents)]   # input file names
            for parent in self.parents:
                parent.children.append(self)
            self.children = [File(c) for c in (children.values() if type(children)==dict else children)]  # output file names
            # register that this TaskNode creates its children
            for child in self.children:
                assert child.parents == [], "Child can only have one TaskNode as parent"
                child.parents = [self]
            self.name = name
            self.params = params # whatever, e.g. slurmParams 
            
        def exists(self):
            "return whether all children exist"
            return all([fn.exists() for fn in self.children])
    
        def delete_results(self):
            "delete files corresponding to node's children"
            for fn in self.children:
                fn.delete()
    
        

    File nodes are created automatically when ProcessNode instantiated

  • specification of data flow by rules using decorator syntax
  • all rules are specified using relative paths
  • pattern matching uses parse library, supports format string syntax, e.g. ‘{name[:format_spec]}’
    inputs = ['input1.txt', 'input2.txt']
    inputs = ['{path}/{PN}.txt', '{path}/{PN}.bam']
    inputs = dict(text = '{path}/{PN}_p{pvalue:f}_d{distance:d}.txt', 
                  bam = '{path}/{PN}.bam')
        
  • from all matching rules, the one with fewest patterns is selected
  • if rule matches, the process is created (parameterized) by a function
    @dap2.rule(inputs = ['{path}/{PN}.txt'],
              outputs = ['{path}/{PN}.out'],
              kind = 'process')
    def make_process(inputs, outputs, **wildcards):
        data = lookup_some_data(wildcards['PN'])
        cmd = make_cmd(data)
        return dap2.ProcessNode(cmd,
                               parents = inputs,
                               children = outputs)  
        

This happens during DAG creation, not during execution on workers. Function can access database, compute cluster requirements based on input files, etc.

  • for user convenience,
    • ‘shell’ rule

      function returns shell command as string

      @dap2.rule(inputs = ['input.txt'],
                 outputs = ['output.txt'],
                 kind = 'shell')
      def move(inputs, outputs, **wildcards):
          return 'mv %s %s' % (inputs[0], outputs[0])
              
    • ‘python’ rule

      function is directly called (as wrapped shell command)

      @dap2.rule(inputs = ['input.txt'],
                outputs = ['output.txt'],
                kind = 'python')
      def copy_file(inputs, outputs, **wildcards):
          with open(inputs[0]) as infh, open(outputs[0]) as outfh:
              for line in infh:
                  print >> outfh, line.strip()
              
      Note: To call function func in module.py, do
      python -E -c "import module; module.func(*args, **kw)"
              

      I.e. the module needs to be accessible from python.

      Such a command line can be created using dap2.shell_command. Given a function, it figures out which python module to load and creates the corresponding shell command:

      dap2.shell_command(myfunc, arg1, arg2, kw1 = val1)
              

Dependency resolution and scheduling

  • given desired output files (targets), find processes needed to create them from existing files using DAG
    • if file in targets does not exist
      • find process to create it (using DAG or rules)
      • add process to processes
      • add input files to targets
  • execute processes, taking care of dependencies between them
  • in case of process error, remove process output files
dap2.createTargets(['output.txt', 'mypath/AACSUCR.out'],
                  scheduler = dap2.LocalScheduler(),
                  n_processes = 4)

Local scheduler

  • uses thread pool to run shell subprocesses
  • can be used as scheduler within process running on a cluster machine (instead of GATK queue, for example)
    l = dap2.LocalScheduler(processes, n_processes = 4, nfs_timeout = None, continue_on_error = False)
    l.run()
        

SLURM scheduler

  • uses SLURM cluster
    • submit jobs using sbatch
    • check for status periodically using sacct
    s = dap2.SlurmScheduler(processes, n_processes = 100, nfs_timeout = 30, log_dir = 'logs', interval = 30,
                            jobParams = {}, continue_on_error = True, dp = 'provenance.db')
    s.run()
        
  • SLURM job params can be passed from rules

    anything in slurmParams passed to sbatch

    @dap2.rule(inputs = ['input.txt'],
              outputs = ['output.txt'],
              kind = 'python',
              slurmParams = {'mem':'4g', 'time':'1:00:00', 'job-name':'copy_file'})
    def copy_file(inputs, outputs, **wildcards):
        with open(inputs[0]) as infh, open(outputs[0]) as outfh:
            for line in infh:
                print >> outfh, line.strip()
        

rp file

  • creates rp file for submission with rpsubmit
    dap2.makeRpFile(targetFiles, filename)
        

Provenance

  • can trace back how any output file was created
  • implemented as SQLite3 database with tables
    • files
      ColumnDescription
      idPK
      pathabsolute path to file
      process_idFK processes.id
    • processes
      ColumnDescription
      idPK
      cmdshell command
      name
      paramsslurmParams, slurmStatus, ..
      job_idSLURM job id (for sacct queries)
      statusstatus
      exit_codeSLURM exit code
      start_time
      end_time
    • process_parents, process_children
      ColumnDescription
      process_idprocesses.id
      file_idfiles.id

Examples

as Makefile replacement

command line interface available as function dap2.cli

def cli():
    """
    command line interface
    """
    import argparse
    parser = argparse.ArgumentParser(description='Data analysis pipeline')
    parser.add_argument('-p', '--numproc', default=1, help='Number of processes to run simultaneously')
    parser.add_argument('-l', '--logdir', default = 'logs', help='Directory for cluster logs (.out and .err files)')
    parser.add_argument('-c', '--cluster', action='store_true', help='Execute processes on cluster')
    parser.add_argument('-j', '--jobparams', default='', help='Cluster parameters, comma separated option=value pairs, passed to scheduler')
    parser.add_argument('-n', action='store_true', help='Do not run processes. Only print them.')
    parser.add_argument('targets', nargs='+', help='files to be created')
    args = parser.parse_args()
    print args
    processes = get_required_processes([File(f) for f in args.targets])

    if args.n:
        for process in processes:
            print process.name
    else:
        if args.cluster:
            try:
                jobParams = {} if args.jobparams == '' else dict([tuple(j.split('=')) for j in args.jobparams.split(',')])
            except:
                logger.error('Malformed jobparams: %s' % args.jobparams)
                sys.exit(2) 
            s = SlurmScheduler(processes, n_processes = int(args.numproc), log_dir=args.logdir, jobParams=jobParams)
        else:
            s = LocalScheduler(processes, n_processes = int(args.numproc))        
        s.run()

Illustrative example:

import dap2
import os

@dap2.rule(['{path}/hello{name}.txt'],
           kind = 'python')
def hello(inputs, outputs, **wildcards):
    with open(os.path.join(wildcards['path'], 
                           'hello%s.txt' % wildcards['name']), 'w') as fh:
        print >> fh, 'Hello %s' % wildcards['name']


if __name__ == '__main__':
    dap2.cli()

Note: target needs to specify ‘{path}’ component here, ‘helloJon.txt’ will not work.

About

Data processing pipeline as DAG in python

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages