diff --git a/pbprocesstools/__init__.py b/pbprocesstools/__init__.py index 955bd0f..ee403fa 100644 --- a/pbprocesstools/__init__.py +++ b/pbprocesstools/__init__.py @@ -40,8 +40,8 @@ import json PB_PROCESS_TOOLS_VERSION_MAJOR = 1 -PB_PROCESS_TOOLS_VERSION_MINOR = 2 -PB_PROCESS_TOOLS_VERSION_PATCH = 4 +PB_PROCESS_TOOLS_VERSION_MINOR = 3 +PB_PROCESS_TOOLS_VERSION_PATCH = 0 PB_PROCESS_TOOLS_VERSION = str(PB_PROCESS_TOOLS_VERSION_MAJOR) + "." + str(PB_PROCESS_TOOLS_VERSION_MINOR) + "." + str(PB_PROCESS_TOOLS_VERSION_PATCH) PB_PROCESS_TOOLS_VERSION_OBJ = LooseVersion(PB_PROCESS_TOOLS_VERSION) diff --git a/pbprocesstools/pbpt_q_process.py b/pbprocesstools/pbpt_q_process.py index 5e208e6..d9b6239 100644 --- a/pbprocesstools/pbpt_q_process.py +++ b/pbprocesstools/pbpt_q_process.py @@ -107,6 +107,8 @@ def __init__(self, queue_db_info=None, cmd_name=None, descript=None, params=None self.descript = descript self.params = params self.debug_job_id = None + self.debug_job_id_params = False + self.debug_job_id_rmouts = False super().__init__(uid_len) def set_params(self, params): @@ -260,6 +262,18 @@ def outputs_present(self, **kwargs): """ pass + @abstractmethod + def remove_outputs(self, **kwargs): + """ + An abstract function which checks if an output is present and removes them. This can be useful + if an job failed part way through processing and needs to be reset. + + :param kwargs: allows the user to pass custom variables to the function + (e.q., obj.remove_outputs(mod_version=True)). + + """ + pass + def check_required_fields(self, **kwargs): """ A function which checks that the required fields are present within the @@ -317,10 +331,18 @@ def parse_cmds(self, argv=None, **kwargs): "therefore the database will not be " "updated and the job will be run " "regardless of the database status.") + parser.add_argument("-p", "--params", action='store_true', default=False, + help="If a job is specified then rather than running the parameters will be " + "printed to the console.") + parser.add_argument("-r", "--rmouts", action='store_true', default=False, + help="If a job is specified then rather than running the job the outputs will be " + "removed.") if argv is None: argv = sys.argv[1:] args = parser.parse_args(argv) self.debug_job_id = args.job + self.debug_job_id_params = args.params + self.debug_job_id_rmouts = args.rmouts with open(args.dbinfo) as f: self.queue_db_info = json.load(f) self.check_db_info() @@ -425,8 +447,15 @@ def std_run(self, **kwargs): self.params = job_info.JobParams if job_info is not None: logger.debug("Found the job to process, PID: {}".format(self.job_pid)) - self.check_required_fields(**kwargs) - self.do_processing(**kwargs) + if self.debug_job_id_params: + import pprint + pprint.pprint(self.params) + elif self.debug_job_id_rmouts: + self.check_required_fields(**kwargs) + self.remove_outputs(**kwargs) + else: + self.check_required_fields(**kwargs) + self.do_processing(**kwargs) logger.debug("Finished processing the job, PID: {}".format(self.job_pid)) else: @@ -545,6 +574,69 @@ def check_job_outputs(self, process_tools_mod, process_tools_cls, out_err_pid_fi pathlib.Path(out_err_pid_file).touch() pathlib.Path(out_err_info_file).touch() + def remove_job_outputs(self, process_tools_mod, process_tools_cls, all_jobs=False, error_jobs=False, **kwargs): + """ + A function which following the completion of all the processing for a job tests whether all the output + files where created (i.e., the job successfully completed). + + :param process_tools_mod: the module (i.e., path to python script) containing the implementation + of the PBPTProcessTool class used for the processing to be checked. + :param process_tools_cls: the name of the class implementing the PBPTProcessTool class used + for the processing to be checked. + :param all_jobs: boolean specifying that outputs should be removed for all jobs. + :param error_jobs: boolean specifying that outputs should be removed for error jobs - either + logged an error or started but not finished. + :param job_id: int for the job id for which the outputs will be removed. + :param kwargs: allows the user to pass custom variables to the function (e.q., obj.gen_command_info(input='')), + these will be passed to the process_tools_mod outputs_present function. + + """ + import importlib + + if (not all_jobs) and (not error_jobs): + raise Exception("Must specify for either all or only error jobs to have the outputs removed.") + + queue_db_info = dict() + queue_db_info['sqlite_db_file'] = self.sqlite_db_file + queue_db_info['sqlite_db_conn'] = self.sqlite_db_conn + + process_tools_mod_inst = importlib.import_module(process_tools_mod) + if process_tools_mod_inst is None: + raise Exception("Could not load the module: '{}'".format(process_tools_mod)) + + process_tools_cls_inst = getattr(process_tools_mod_inst, process_tools_cls)() + if process_tools_cls_inst is None: + raise Exception("Could not create instance of '{}'".format(process_tools_cls)) + process_tools_cls_inst.set_queue_db_info(queue_db_info) + + pbpt_utils = PBPTUtils() + + if pbpt_utils.get_file_lock(self.sqlite_db_file, sleep_period=1, wait_iters=180, use_except=False): + try: + logger.debug("Creating Database Engine and Session.") + db_engine = sqlalchemy.create_engine(self.sqlite_db_conn, pool_pre_ping=True) + session_sqlalc = sqlalchemy.orm.sessionmaker(bind=db_engine) + ses = session_sqlalc() + logger.debug("Created Database Engine and Session.") + + if all_jobs: + jobs = ses.query(PBPTProcessJob).filter().all() + elif error_jobs: + jobs = ses.query(PBPTProcessJob).filter(PBPTProcessJob.Started == True, + PBPTProcessJob.Completed == False).all() + else: + raise Exception("Must specify for either all or only error jobs to have the outputs removed.") + + if jobs is not None: + for job_info in tqdm.tqdm(jobs): + process_tools_cls_inst.set_params(job_info.JobParams) + process_tools_cls_inst.remove_outputs(**kwargs) + ses.close() + pbpt_utils.release_file_lock(self.sqlite_db_file) + except: + pbpt_utils.release_file_lock(self.sqlite_db_file) + + def create_jobs_report(self, out_report_file=None): """ A function which generates a JSON report which can either @@ -579,9 +671,9 @@ def create_jobs_report(self, out_report_file=None): n_started += 1 n_ended += 1 else: - err_info[job_info.PID] = dict() if job_info.Error: n_errs += 1 + err_info[job_info.PID] = dict() err_info[job_info.PID]['info'] = job_info.ErrorInfo if job_info.Started: @@ -804,6 +896,19 @@ def run_check_outputs(self): """ pass + @abstractmethod + def run_remove_outputs(self, all_jobs=False, error_jobs=False): + """ + An abstract function which needs to be implemented with the functions and inputs + you want run to check the outputs of the processing have been successfully completed. + + You will presumably want to call: + + * self.remove_job_outputs + + """ + pass + def parse_cmds(self, argv=None): """ A function to parse the command line arguments to retrieve the @@ -816,6 +921,9 @@ def parse_cmds(self, argv=None): parser.add_argument("--gen", action='store_true', default=False, help="Execute run_gen_commands() function.") parser.add_argument("--check", action='store_true', default=False, help="Execute run_check_outputs() function.") parser.add_argument("--report", action='store_true', default=False, help="Execute create_jobs_report() function.") + parser.add_argument("--rmouts", action='store_true', default=False, help="Execute run_remove_outputs() function.") + parser.add_argument("--all", action='store_true', default=False, help="Remove outputs for all jobs.") + parser.add_argument("--error", action='store_true', default=False, help="Remove outputs for jobs with errors.") parser.add_argument("-o", "--output", type=str, required=False, help="Specify a report output JSON file. If not provided then report written to console.") if argv is None: argv = sys.argv[1:] @@ -827,6 +935,9 @@ def parse_cmds(self, argv=None): self.run_check_outputs() elif args.report: self.create_jobs_report(args.output) + elif args.rmouts: + self.run_remove_outputs(all_jobs=args.all, error_jobs=args.error) + diff --git a/setup.py b/setup.py index c2adb88..9d02c65 100644 --- a/setup.py +++ b/setup.py @@ -37,7 +37,7 @@ import os setuptools.setup(name='pb_process_tools', - version='1.2.4', + version='1.3.0', description='Tools for batch processing data, including on HPC cluster with slurm.', author='Pete Bunting', author_email='petebunting@mac.com',