-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_model.py
278 lines (230 loc) · 8.36 KB
/
run_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
"""
Functions to run the model
This module allows you to run the various models. It allows you to run a single model run of one of
the different types of models for debugging purposes, or it allows you to run all of the models in
parallel saving the results to disk.
There are existing launch profiles for vscode that use this file, or you can use it directly in the
console, e.g.
python run_model.py data/[DATASET]/results/[SCENARIO]/[RUN_TIME] 0 1 --debug -t=ip
will run a single run of the inpatients model, returning the results to display.
"""
import argparse
import logging
import os
import time
from multiprocessing import Pool
from typing import Any, Callable
from tqdm.auto import tqdm as base_tqdm
from model.aae import AaEModel
from model.data import Data, Local
from model.health_status_adjustment import HealthStatusAdjustmentInterpolated
from model.helpers import load_params
from model.inpatients import InpatientsModel
from model.model import Model
from model.model_iteration import ModelIteration
from model.outpatients import OutpatientsModel
from model.results import combine_results, generate_results_json, save_results_files
class tqdm(base_tqdm): # pylint: disable=inconsistent-mro, invalid-name
"""Custom tqdm class that provides a callback function on update"""
# ideally this would be set in the contstructor, but as this is a pretty
# simple use case just implemented as a static variable. this does mean that
# you need to update the value before using the class (each time)
progress_callback = None
def update(self, n=1):
"""overide the default tqdm update function to run the callback method"""
super().update(n)
if tqdm.progress_callback:
tqdm.progress_callback(self.n) # pylint: disable=not-callable
def timeit(func: Callable, *args) -> Any:
"""
Time how long it takes to evaluate function `f` with arguments `*args`.
"""
start = time.time()
results = func(*args)
print(f"elapsed: {time.time() - start:.3f}s")
return results
def _run_model(
model_type: Model,
params: dict,
data: Data,
hsa: Any,
run_params: dict,
progress_callback,
save_full_model_results: bool,
) -> dict:
"""Run the model iterations
Runs the model for all of the model iterations, returning the aggregated results
:param model_type: the type of model that we want to run
:type model_type: Model
:param params: the parameters to run the model with
:type params: dict
:param path: where the data is stored
:type path: str
:param hsa: an instance of the HealthStatusAdjustment class
:type hsa: HealthStatusAdjustment
:param run_params: the generated run parameters for the model run
:type run_params: dict
:return: a dictionary containing the aggregated results
:rtype: dict
"""
model_class = model_type.__name__[:-5] # pylint: disable=protected-access
logging.info("%s", model_class)
logging.info(" * instantiating")
model = model_type(params, data, hsa, run_params, save_full_model_results)
logging.info(" * running")
# set the progress callback for this run
tqdm.progress_callback = progress_callback
# model run 0 is the baseline
# model run 1:n are the monte carlo sims
model_runs = list(range(params["model_runs"] + 1))
cpus = os.cpu_count()
batch_size = int(os.getenv("BATCH_SIZE", "1"))
with Pool(cpus) as pool:
results = list(
tqdm(
pool.imap(
model.go,
model_runs,
chunksize=batch_size,
),
f"Running {model.__class__.__name__[:-5].rjust(11)} model", # pylint: disable=protected-access
total=len(model_runs),
)
)
logging.info(" * finished")
return results
def run_all(
params: dict, data_path: str, progress_callback, save_full_model_results: bool
) -> str:
"""Run the model
runs all 3 model types, aggregates and combines the results
:param params: the parameters to use for this model run
:type params: dict
:param data_path: where the data is stored
:type data_path: str
:return: the filename of the saved results
:rtype: str
"""
model_types = [InpatientsModel, OutpatientsModel, AaEModel]
run_params = Model.generate_run_params(params)
nhp_data = Local.create(data_path)
# set the data path in the HealthStatusAdjustment class
hsa = HealthStatusAdjustmentInterpolated(
nhp_data(params["start_year"], params["dataset"]), params["start_year"]
)
pcallback = progress_callback()
results, step_counts = combine_results(
[
_run_model(
m,
params,
nhp_data,
hsa,
run_params,
pcallback(m.__name__[:-5]),
save_full_model_results,
)
for m in model_types
]
)
json_filename = generate_results_json(results, step_counts, params, run_params)
# TODO: once generate_results_json is deperecated this step should be moved into combine_results
results["step_counts"] = step_counts
# TODO: this should be what the model returns once generate_results_json is deprecated
saved_files = save_results_files(results, params) # pylint: disable=unused-variable
return saved_files, json_filename
def run_single_model_run(
params: dict, data_path: str, model_type: Model, model_run: int
) -> None:
"""
Runs a single model iteration for easier debugging in vscode
"""
data = Local.create(data_path)
print("initialising model... ", end="")
model = timeit(model_type, params, data)
print("running model... ", end="")
m_run = timeit(ModelIteration, model, model_run)
print("aggregating results... ", end="")
model_results, step_counts = timeit(m_run.get_aggregate_results)
#
print()
print("change factors:")
step_counts = (
step_counts.reset_index()
.groupby(["change_factor", "measure"], as_index=False)["value"]
.sum()
.pivot(index="change_factor", columns="measure")
)
step_counts.loc["total"] = step_counts.sum()
print(step_counts.fillna(0).astype(int))
#
print()
print("aggregated (default) results:")
default_results = (
model_results["default"]
.reset_index()
.groupby(["pod", "measure"], as_index=False)
.agg({"value": sum})
.pivot(index=["pod"], columns="measure")
.fillna(0)
)
default_results.loc["total"] = default_results.sum()
print(default_results)
def _run_model_argparser() -> argparse.Namespace: # pragma: no cover
parser = argparse.ArgumentParser()
parser.add_argument(
"params_file",
nargs="?",
default="queue/sample_params.json",
help="Path to the params.json file",
)
parser.add_argument("-d", "--data-path", help="Path to the data", default="data")
parser.add_argument(
"-r", "--model-run", help="Which model iteration to run", default=1, type=int
)
parser.add_argument(
"-t",
"--type",
default="all",
choices=["all", "aae", "ip", "op"],
help="Model type, either: all, ip, op, aae",
type=str,
)
parser.add_argument("--save-full-model-results", action="store_true")
return parser.parse_args()
def main() -> None:
"""
Main method
Runs when __name__ == "__main__"
"""
# Grab the Arguments
args = _run_model_argparser()
#
params = load_params(args.params_file)
# define the model to run
match args.type:
case "all":
logging.basicConfig(
format="%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
)
run_all(
params,
args.data_path,
lambda: lambda _: None,
args.save_full_model_results,
)
return
case "aae":
model_type = AaEModel
case "ip":
model_type = InpatientsModel
case "op":
model_type = OutpatientsModel
run_single_model_run(params, args.data_path, model_type, args.model_run)
def init():
"""method for calling main"""
if __name__ == "__main__":
main()
init()