-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpost_exec.py
executable file
·181 lines (144 loc) · 6.16 KB
/
post_exec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/path/to/venv/bin/python
"""
SUMMARY:
This script runs after a job has exited, and if it was overrunning and was
checkpointed, then it will restart the job.
USERS:
This script will be run by normal users through LSF, with their normal
privileges and capabilities.
DOCUMENTATION:
admin facing:
https://ssg-confluence.internal.sanger.ac.uk/pages/viewpage.action?pageId=153044333
user facing:
https://ssg-confluence.internal.sanger.ac.uk/pages/viewpage.action?pageId=153053165
"""
from pythonlsf import lsf
import logging
import os
import sys
import json
import subprocess
import datetime
import pathlib
CLUSTER_NAME = lsf.ls_getclustername()
JOB_ID = os.environ['LSB_JOBID']
def fmt_runtime(runtime):
tdelta = datetime.timedelta(seconds=runtime)
hours, rem = divmod(tdelta.seconds, 3600)
mins, secs = divmod(rem, 60)
return f"{tdelta.days} days, {hours} hours, {mins} minutes, {secs} seconds"
def generate_summary(json_filepath: str) -> (str, str):
output_str = ""
try:
with open(json_filepath, 'r') as f:
raw_json = f.read()
except Exception as err:
output_str = (f"could not open file to generate summary, error: {err}."
" Please log a ticket with ISG if this persists, via "
"emailing <[email protected]> with FAO ISG in the"
" title. Please include the error and the job ID in the "
" email.\n")
return (output_str, None)
try:
data = json.loads(raw_json)
except json.JSONDecodeError as err:
output_str = ("could not decode JSON to generate summary, it may be "
f"corrupted. error: {err}. Please log a ticket with ISG "
"if this persists, via emailing "
"<[email protected]> with FAO ISG in the title. "
"Please include this message and the job ID in the email.\n")
return (output_str, None)
summary_json = data['summary_info']
output_str += ("Checkpoint / restore summary for overrunning job, which "
f"originally had an ID of: {summary_json[0]['id']}\n")
output_str += "Job information for restarted jobs:\n"
total_runtime = 0
for job in summary_json:
start_time = datetime.datetime.fromtimestamp(job['start_time'])
output_str += (f"Job {job['id']} ran for {fmt_runtime(job['runtime'])}"
f" and started at {start_time}\n")
total_runtime += job['runtime']
output_str += "\n********** Aggregated Stats **********\n"
output_str += f"Total runtime: {fmt_runtime(total_runtime)}\n"
output_str += f"Restarts due to overrunning: {data['overrun_restarts']}\n"
return (output_str, summary_json[0]['id'])
def has_chkpnt_dir() -> bool:
return 'LSB_CHKPNT_DIR' in os.environ
def get_data_dump_path() -> str:
"""
Return the path this job should use for writing the checkpoint state
something like CHECKPOINT_DIR/.checkpoint-state-j{JOB_ID}.json
"""
return (f'{os.environ["LSB_CHKPNT_DIR"]}'
f'/.checkpoint-state.json')
def get_data() -> dict | None:
path = get_data_dump_path()
logging.debug(f"reading checkpoint state info from: {path}")
try:
with open(path, 'r') as f:
raw_data = f.read()
except FileNotFoundError:
logging.debug("json data does not exist, job was not overrunning "
"checkpointed, doing nothing.")
return None
except Exception as err:
logging.debug(f"unexpected error: {err}")
return None
logging.debug(f"raw checkpoint data read in: {raw_data}")
try:
data = json.loads(raw_data)
except json.JSONDecodeError as err:
logging.error(f"loaded {path}, but an error occured while parsing the "
"JSON data, the file must be corrupted. Error given: "
f"{err}")
return None
except Exception as err:
logging.debug(f"unexpected error: {err}")
return None
if data['version'] == 'V1':
logging.debug("loading data using v1 json")
return data
logging.error("could not load data, unknown version")
return None
if __name__ == "__main__":
print("execution of post exec beginning")
if not has_chkpnt_dir():
sys.exit(1)
logging.basicConfig(format=f"%(levelname)s: %(asctime)s (job {JOB_ID}):"
" %(message)s",
datefmt="%Y/%m/%d %H:%M:%S",
filename=(f"{os.environ['LSB_CHKPNT_DIR']}/"
f"post-exec-{JOB_ID}.log"),
level=logging.DEBUG,
encoding='utf-8')
job_data = get_data()
if job_data is None:
sys.exit(0)
logging.debug(f"checkpoint state info successfully read in: {job_data}")
if (job_data['chkpnt_state'] == 0x8 and job_data['should_restart'] and
job_data['job_id'] == int(os.environ['LSB_JOBID'])):
logging.debug("should restart job")
path = pathlib.PurePath(os.environ['LSB_CHKPNT_DIR'])
path = path.parent
cmd = f". /usr/local/lsf/conf/profile.lsf; brestart {path} {JOB_ID}"
logging.debug(cmd)
out = subprocess.run(cmd, shell=True, capture_output=True)
logging.debug(f"brestart cmd output: {out}")
else:
logging.debug("should not restart job")
if os.environ['LSB_OUTPUTFILE'] == "/dev/null":
logging.info("output is /dev/null, not writing a summary.")
sys.exit(0)
else:
(summary, job_id) = generate_summary(get_data_dump_path())
splits = os.environ['LSB_OUTPUTFILE'].rpartition('/')
if job_id is None:
output_file = splits[0] + '/' + 'overrun-summary-ERR.txt'
else:
output_file = splits[0] + '/' + f'overrun-summary-{job_id}.txt'
try:
with open(output_file, 'w') as f:
f.write(summary)
except OSError as err:
logging.error(f"could not write summary, error: {err}")
logging.debug("finish post_exec")