Skip to content

Commit

Permalink
Update to provide an option to remove output files and print job para…
Browse files Browse the repository at this point in the history
…meters.
  • Loading branch information
petebunting committed Jul 16, 2020
1 parent 99be28b commit 7a6d9d9
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 6 deletions.
4 changes: 2 additions & 2 deletions pbprocesstools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import json

PB_PROCESS_TOOLS_VERSION_MAJOR = 1
PB_PROCESS_TOOLS_VERSION_MINOR = 2
PB_PROCESS_TOOLS_VERSION_PATCH = 4
PB_PROCESS_TOOLS_VERSION_MINOR = 3
PB_PROCESS_TOOLS_VERSION_PATCH = 0

PB_PROCESS_TOOLS_VERSION = str(PB_PROCESS_TOOLS_VERSION_MAJOR) + "." + str(PB_PROCESS_TOOLS_VERSION_MINOR) + "." + str(PB_PROCESS_TOOLS_VERSION_PATCH)
PB_PROCESS_TOOLS_VERSION_OBJ = LooseVersion(PB_PROCESS_TOOLS_VERSION)
Expand Down
117 changes: 114 additions & 3 deletions pbprocesstools/pbpt_q_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ def __init__(self, queue_db_info=None, cmd_name=None, descript=None, params=None
self.descript = descript
self.params = params
self.debug_job_id = None
self.debug_job_id_params = False
self.debug_job_id_rmouts = False
super().__init__(uid_len)

def set_params(self, params):
Expand Down Expand Up @@ -260,6 +262,18 @@ def outputs_present(self, **kwargs):
"""
pass

@abstractmethod
def remove_outputs(self, **kwargs):
"""
An abstract function which checks if an output is present and removes them. This can be useful
if an job failed part way through processing and needs to be reset.
:param kwargs: allows the user to pass custom variables to the function
(e.q., obj.remove_outputs(mod_version=True)).
"""
pass

def check_required_fields(self, **kwargs):
"""
A function which checks that the required fields are present within the
Expand Down Expand Up @@ -317,10 +331,18 @@ def parse_cmds(self, argv=None, **kwargs):
"therefore the database will not be "
"updated and the job will be run "
"regardless of the database status.")
parser.add_argument("-p", "--params", action='store_true', default=False,
help="If a job is specified then rather than running the parameters will be "
"printed to the console.")
parser.add_argument("-r", "--rmouts", action='store_true', default=False,
help="If a job is specified then rather than running the job the outputs will be "
"removed.")
if argv is None:
argv = sys.argv[1:]
args = parser.parse_args(argv)
self.debug_job_id = args.job
self.debug_job_id_params = args.params
self.debug_job_id_rmouts = args.rmouts
with open(args.dbinfo) as f:
self.queue_db_info = json.load(f)
self.check_db_info()
Expand Down Expand Up @@ -425,8 +447,15 @@ def std_run(self, **kwargs):
self.params = job_info.JobParams
if job_info is not None:
logger.debug("Found the job to process, PID: {}".format(self.job_pid))
self.check_required_fields(**kwargs)
self.do_processing(**kwargs)
if self.debug_job_id_params:
import pprint
pprint.pprint(self.params)
elif self.debug_job_id_rmouts:
self.check_required_fields(**kwargs)
self.remove_outputs(**kwargs)
else:
self.check_required_fields(**kwargs)
self.do_processing(**kwargs)

logger.debug("Finished processing the job, PID: {}".format(self.job_pid))
else:
Expand Down Expand Up @@ -545,6 +574,69 @@ def check_job_outputs(self, process_tools_mod, process_tools_cls, out_err_pid_fi
pathlib.Path(out_err_pid_file).touch()
pathlib.Path(out_err_info_file).touch()

def remove_job_outputs(self, process_tools_mod, process_tools_cls, all_jobs=False, error_jobs=False, **kwargs):
"""
A function which following the completion of all the processing for a job tests whether all the output
files where created (i.e., the job successfully completed).
:param process_tools_mod: the module (i.e., path to python script) containing the implementation
of the PBPTProcessTool class used for the processing to be checked.
:param process_tools_cls: the name of the class implementing the PBPTProcessTool class used
for the processing to be checked.
:param all_jobs: boolean specifying that outputs should be removed for all jobs.
:param error_jobs: boolean specifying that outputs should be removed for error jobs - either
logged an error or started but not finished.
:param job_id: int for the job id for which the outputs will be removed.
:param kwargs: allows the user to pass custom variables to the function (e.q., obj.gen_command_info(input='')),
these will be passed to the process_tools_mod outputs_present function.
"""
import importlib

if (not all_jobs) and (not error_jobs):
raise Exception("Must specify for either all or only error jobs to have the outputs removed.")

queue_db_info = dict()
queue_db_info['sqlite_db_file'] = self.sqlite_db_file
queue_db_info['sqlite_db_conn'] = self.sqlite_db_conn

process_tools_mod_inst = importlib.import_module(process_tools_mod)
if process_tools_mod_inst is None:
raise Exception("Could not load the module: '{}'".format(process_tools_mod))

process_tools_cls_inst = getattr(process_tools_mod_inst, process_tools_cls)()
if process_tools_cls_inst is None:
raise Exception("Could not create instance of '{}'".format(process_tools_cls))
process_tools_cls_inst.set_queue_db_info(queue_db_info)

pbpt_utils = PBPTUtils()

if pbpt_utils.get_file_lock(self.sqlite_db_file, sleep_period=1, wait_iters=180, use_except=False):
try:
logger.debug("Creating Database Engine and Session.")
db_engine = sqlalchemy.create_engine(self.sqlite_db_conn, pool_pre_ping=True)
session_sqlalc = sqlalchemy.orm.sessionmaker(bind=db_engine)
ses = session_sqlalc()
logger.debug("Created Database Engine and Session.")

if all_jobs:
jobs = ses.query(PBPTProcessJob).filter().all()
elif error_jobs:
jobs = ses.query(PBPTProcessJob).filter(PBPTProcessJob.Started == True,
PBPTProcessJob.Completed == False).all()
else:
raise Exception("Must specify for either all or only error jobs to have the outputs removed.")

if jobs is not None:
for job_info in tqdm.tqdm(jobs):
process_tools_cls_inst.set_params(job_info.JobParams)
process_tools_cls_inst.remove_outputs(**kwargs)
ses.close()
pbpt_utils.release_file_lock(self.sqlite_db_file)
except:
pbpt_utils.release_file_lock(self.sqlite_db_file)


def create_jobs_report(self, out_report_file=None):
"""
A function which generates a JSON report which can either
Expand Down Expand Up @@ -579,9 +671,9 @@ def create_jobs_report(self, out_report_file=None):
n_started += 1
n_ended += 1
else:
err_info[job_info.PID] = dict()
if job_info.Error:
n_errs += 1
err_info[job_info.PID] = dict()
err_info[job_info.PID]['info'] = job_info.ErrorInfo

if job_info.Started:
Expand Down Expand Up @@ -804,6 +896,19 @@ def run_check_outputs(self):
"""
pass

@abstractmethod
def run_remove_outputs(self, all_jobs=False, error_jobs=False):
"""
An abstract function which needs to be implemented with the functions and inputs
you want run to check the outputs of the processing have been successfully completed.
You will presumably want to call:
* self.remove_job_outputs
"""
pass

def parse_cmds(self, argv=None):
"""
A function to parse the command line arguments to retrieve the
Expand All @@ -816,6 +921,9 @@ def parse_cmds(self, argv=None):
parser.add_argument("--gen", action='store_true', default=False, help="Execute run_gen_commands() function.")
parser.add_argument("--check", action='store_true', default=False, help="Execute run_check_outputs() function.")
parser.add_argument("--report", action='store_true', default=False, help="Execute create_jobs_report() function.")
parser.add_argument("--rmouts", action='store_true', default=False, help="Execute run_remove_outputs() function.")
parser.add_argument("--all", action='store_true', default=False, help="Remove outputs for all jobs.")
parser.add_argument("--error", action='store_true', default=False, help="Remove outputs for jobs with errors.")
parser.add_argument("-o", "--output", type=str, required=False, help="Specify a report output JSON file. If not provided then report written to console.")
if argv is None:
argv = sys.argv[1:]
Expand All @@ -827,6 +935,9 @@ def parse_cmds(self, argv=None):
self.run_check_outputs()
elif args.report:
self.create_jobs_report(args.output)
elif args.rmouts:
self.run_remove_outputs(all_jobs=args.all, error_jobs=args.error)




Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import os

setuptools.setup(name='pb_process_tools',
version='1.2.4',
version='1.3.0',
description='Tools for batch processing data, including on HPC cluster with slurm.',
author='Pete Bunting',
author_email='[email protected]',
Expand Down

0 comments on commit 7a6d9d9

Please sign in to comment.