forked from CrayLabs/SmartSim
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconftest.py
475 lines (418 loc) · 16.8 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
import os
import inspect
import shutil
import pytest
import psutil
import shutil
import smartsim
from smartsim import Experiment
from smartsim.database import Orchestrator
from smartsim.settings import (
SrunSettings, AprunSettings,
JsrunSettings, MpirunSettings,
RunSettings
)
from smartsim._core.config import CONFIG
from smartsim.error import SSConfigError
from subprocess import run
# Globals, yes, but its a testing file
test_path = os.path.dirname(os.path.abspath(__file__))
test_dir = os.path.join(test_path, "tests", "test_output")
test_launcher = CONFIG.test_launcher
test_device = CONFIG.test_device
test_nic = CONFIG.test_interface
# Fill this at runtime if needed
test_hostlist = None
def get_account():
global test_account
test_account = CONFIG.test_account
return test_account
def print_test_configuration():
global test_path
global test_dir
global test_launcher
global test_account
global test_nic
print("TEST_SMARTSIM_LOCATION:", smartsim.__path__)
print("TEST_PATH:", test_path)
print("TEST_LAUNCHER:", test_launcher)
if test_account != "":
print("TEST_ACCOUNT:", test_account)
print("TEST_DEVICE:", test_device)
print("TEST_NETWORK_INTERFACE (WLM only):", test_nic)
print("TEST_DIR:", test_dir)
print("Test output will be located in TEST_DIR if there is a failure")
def pytest_configure():
global test_launcher
pytest.test_launcher = test_launcher
pytest.wlm_options = ["slurm", "pbs", "cobalt", "lsf"]
account = get_account()
pytest.test_account = account
def pytest_sessionstart(session):
"""
Called after the Session object has been created and
before performing collection and entering the run test loop.
"""
if os.path.isdir(test_dir):
shutil.rmtree(test_dir)
os.mkdir(test_dir)
print_test_configuration()
def pytest_sessionfinish(session, exitstatus):
"""
Called after whole test run finished, right before
returning the exit status to the system.
"""
if exitstatus == 0:
shutil.rmtree(test_dir)
else:
# kill all spawned processes in case of error
kill_all_test_spawned_processes()
def kill_all_test_spawned_processes():
# in case of test failure, clean up all spawned processes
pid = os.getpid()
try:
parent = psutil.Process(pid)
except psutil.Error:
# could not find parent process id
return
try:
for child in parent.children(recursive=True):
child.kill()
except Exception:
print("Not all processes were killed after test")
def get_hostlist():
global test_hostlist
if not test_hostlist:
if "COBALT_NODEFILE" in os.environ:
try:
with open(os.environ["COBALT_NODEFILE"], 'r') as nodefile:
lines = nodefile.readlines()
test_hostlist = list(dict.fromkeys([line.strip() for line in lines]))
except:
return None
elif "PBS_NODEFILE" in os.environ:
try:
with open(os.environ["PBS_NODEFILE"], 'r') as nodefile:
lines = nodefile.readlines()
test_hostlist = list(dict.fromkeys([line.strip() for line in lines]))
except:
return None
elif "SLURM_JOB_NODELIST" in os.environ:
try:
nodelist = os.environ["SLURM_JOB_NODELIST"]
test_hostlist = run(["scontrol", "show" , "hostnames", nodelist], capture_output=True, text=True).stdout.split()
except:
return None
return test_hostlist
@pytest.fixture
def wlmutils():
return WLMUtils
class WLMUtils:
@staticmethod
def set_test_launcher(new_test_launcher):
global test_launcher
test_launcher = new_test_launcher
@staticmethod
def get_test_launcher():
global test_launcher
return test_launcher
@staticmethod
def get_test_account():
global test_account
return test_account
@staticmethod
def get_test_interface():
global test_nic
return test_nic
@staticmethod
def get_test_hostlist():
return get_hostlist()
@staticmethod
def get_base_run_settings(exe, args, nodes=1, ntasks=1, **kwargs):
if test_launcher == "slurm":
run_args = {"--nodes": nodes,
"--ntasks": ntasks,
"--time": "00:10:00"}
run_args.update(kwargs)
settings = RunSettings(exe, args, run_command="srun", run_args=run_args)
return settings
if test_launcher == "pbs":
if shutil.which("aprun"):
run_command = "aprun"
run_args = {"--pes": ntasks}
else:
run_command = "mpirun"
host_file = os.environ["PBS_NODEFILE"]
run_args = {"-n": ntasks,
"--hostfile": host_file}
run_args.update(kwargs)
settings = RunSettings(exe, args, run_command=run_command, run_args=run_args)
return settings
if test_launcher == "cobalt":
if shutil.which("aprun"):
run_command = "aprun"
run_args = {"--pes": ntasks}
else:
run_command = "mpirun"
host_file = os.environ["COBALT_NODEFILE"]
run_args = {"-n": ntasks,
"--hostfile": host_file}
run_args.update(kwargs)
settings = RunSettings(exe, args, run_command=run_command, run_args=run_args)
return settings
if test_launcher == "lsf":
run_args = {"--np": ntasks, "--nrs": nodes}
run_args.update(kwargs)
settings = RunSettings(exe, args, run_command="jsrun", run_args=run_args)
return settings
elif test_launcher != "local":
raise SSConfigError(f"Base run settings are available for Slurm, PBS, Cobalt, and LSF, but launcher was {test_launcher}")
# TODO allow user to pick aprun vs MPIrun
return RunSettings(exe, args)
@staticmethod
def get_run_settings(exe, args, nodes=1, ntasks=1, **kwargs):
if test_launcher == "slurm":
run_args = {"nodes": nodes,
"ntasks": ntasks,
"time": "00:10:00"}
run_args.update(kwargs)
settings = SrunSettings(exe, args, run_args=run_args)
return settings
elif test_launcher == "pbs":
if shutil.which("aprun"):
run_args = {"pes": ntasks}
run_args.update(kwargs)
settings = AprunSettings(exe, args, run_args=run_args)
else:
host_file = os.environ["PBS_NODEFILE"]
run_args = {"n": ntasks,
"hostfile": host_file}
run_args.update(kwargs)
settings = MpirunSettings(exe, args, run_args=run_args)
return settings
# TODO allow user to pick aprun vs MPIrun
elif test_launcher == "cobalt":
if shutil.which("aprun"):
run_args = {"pes": ntasks}
run_args.update(kwargs)
settings = AprunSettings(exe, args, run_args=run_args)
else:
host_file = os.environ["COBALT_NODEFILE"]
run_args = {"n": ntasks,
"hostfile": host_file}
run_args.update(kwargs)
settings = MpirunSettings(exe, args, run_args=run_args)
return settings
if test_launcher == "lsf":
run_args = {"nrs": nodes,
"tasks_per_rs": max(ntasks//nodes,1),
}
run_args.update(kwargs)
settings = JsrunSettings(exe, args, run_args=run_args)
return settings
else:
return RunSettings(exe, args)
@staticmethod
def get_orchestrator(nodes=1, port=6780, batch=False):
global test_launcher
global test_nic
if test_launcher in ["pbs", "cobalt"]:
if not shutil.which("aprun"):
hostlist = get_hostlist()
else:
hostlist = None
db = Orchestrator(db_nodes=nodes, port=port, batch=batch, interface=test_nic, launcher=test_launcher, hosts=hostlist)
elif test_launcher == "slurm":
db = Orchestrator(db_nodes=nodes, port=port, batch=batch, interface=test_nic, launcher=test_launcher)
elif test_launcher == "lsf":
db = Orchestrator(db_nodes=nodes, port=port, batch=batch, cpus_per_shard=4, gpus_per_shard=2 if test_device=="GPU" else 0, project=get_account(), interface=test_nic, launcher=test_launcher)
else:
db = Orchestrator(port=port, interface="lo")
return db
@pytest.fixture
def local_db(fileutils, request):
"""Yield fixture for startup and teardown of an local orchestrator"""
exp_name = request.function.__name__
exp = Experiment(exp_name, launcher="local")
test_dir = fileutils.make_test_dir(caller_function=exp_name, caller_fspath=request.fspath)
db = Orchestrator(port=6780, interface="lo")
db.set_path(test_dir)
exp.start(db)
yield db
# pass or fail, the teardown code below is ran after the
# completion of a test case that uses this fixture
exp.stop(db)
@pytest.fixture
def db(fileutils, wlmutils, request):
"""Yield fixture for startup and teardown of an orchestrator"""
launcher = wlmutils.get_test_launcher()
exp_name = request.function.__name__
exp = Experiment(exp_name, launcher=launcher)
test_dir = fileutils.make_test_dir(caller_function=exp_name, caller_fspath=request.fspath)
db = wlmutils.get_orchestrator()
db.set_path(test_dir)
exp.start(db)
yield db
# pass or fail, the teardown code below is ran after the
# completion of a test case that uses this fixture
exp.stop(db)
@pytest.fixture
def db_cluster(fileutils, wlmutils, request):
"""
Yield fixture for startup and teardown of a clustered orchestrator.
This should only be used in on_wlm and full_wlm tests.
"""
launcher = wlmutils.get_test_launcher()
exp_name = request.function.__name__
exp = Experiment(exp_name, launcher=launcher)
test_dir = fileutils.make_test_dir(caller_function=exp_name, caller_fspath=request.fspath)
db = wlmutils.get_orchestrator(nodes=3)
db.set_path(test_dir)
exp.start(db)
yield db
# pass or fail, the teardown code below is ran after the
# completion of a test case that uses this fixture
exp.stop(db)
@pytest.fixture
def dbutils():
return DBUtils
class DBUtils:
@staticmethod
def get_db_configs():
config_settings = {
"enable_checkpoints": 1,
"set_max_memory": "3gb",
"set_eviction_strategy": "allkeys-lru",
# set low to avoid permissions issues during testing
# TODO make a note in SmartRedis about this method possibly
# erroring due to the max file descriptors setting being too low
"set_max_clients": 100,
"set_max_message_size": 2_147_483_648,
}
return config_settings
@staticmethod
def get_smartsim_error_db_configs():
bad_configs = {
"save": [
"-1", # frequency must be positive
"2.4", # frequency must be specified in whole seconds
],
"maxmemory": [
"29GG", # invalid memory form
str(2 ** 65) + "gb", # memory is too much
"3.5gb", # invalid memory form
],
"maxclients": [
"-3", # number clients must be positive
str(2 ** 65), # number of clients is too large
"2.9", # number of clients must be an integer
],
"proto-max-bulk-len": [
"100", # max message size can't be smaller than 1mb
"9.9gb", # invalid memory form
"101.1", # max message size must be an integer
],
"maxmemory-policy": ["invalid-policy"], # must use a valid maxmemory policy
"invalid-parameter": ["99"], # invalid key - no such configuration exists
}
return bad_configs
@staticmethod
def get_type_error_db_configs():
bad_configs = {
"save": [2, True, ["2"]], # frequency must be specified as a string
"maxmemory": [99, True, ["99"]], # memory form must be a string
"maxclients": [3, True, ["3"]], # number of clients must be a string
"proto-max-bulk-len": [101, True, ["101"]], # max message size must be a string
"maxmemory-policy": [42, True, ["42"]], # maxmemory policies must be strings
10: ["3"], # invalid key - the key must be a string
}
return bad_configs
@staticmethod
def get_config_edit_method(db, config_setting):
"""Get a db configuration file edit method from a str"""
config_edit_methods = {
"enable_checkpoints": db.enable_checkpoints,
"set_max_memory": db.set_max_memory,
"set_eviction_strategy": db.set_eviction_strategy,
"set_max_clients": db.set_max_clients,
"set_max_message_size": db.set_max_message_size,
}
return config_edit_methods.get(config_setting, None)
@pytest.fixture
def fileutils():
return FileUtils
class FileUtils:
@staticmethod
def _test_dir_path(caller_function, caller_fspath):
caller_file_to_dir = os.path.splitext(str(caller_fspath))[0]
rel_path = os.path.relpath(caller_file_to_dir, os.path.dirname(test_dir))
dir_path = os.path.join(test_dir, rel_path, caller_function)
return dir_path
@staticmethod
def get_test_dir(caller_function=None, caller_fspath=None):
"""Get path to test output.
This function should be called without arguments from within
a test: the returned directory will be
`test_output/<relative_path_to_test_file>/<test_filename>/<test_name>`.
When called from other functions (e.g. from functions in this file),
the caller function and the caller file path should be provided.
The directory will not be created, but the parent (and all the needed
tree) will. This is to allow tests to create the directory.
:param caller_function: caller function name defaults to None
:type caller_function: str, optional
:param caller_fspath: absolute path to file containing caller, defaults to None
:type caller_fspath: str or Path, optional
:return: String path to test ouptut directory
:rtype: str
"""
if not caller_function or not caller_fspath:
caller_frame = inspect.stack()[1]
caller_fspath = caller_frame.filename
caller_function = caller_frame.function
dir_path = FileUtils._test_dir_path(caller_function, caller_fspath)
if not os.path.exists(os.path.dirname(dir_path)):
os.makedirs(os.path.dirname(dir_path))
# dir_path = os.path.join(test_dir, dir_name)
return dir_path
@staticmethod
def make_test_dir(caller_function=None, caller_fspath=None):
"""Create test output directory and return path to it.
This function should be called without arguments from within
a test: the directory will be created as
`test_output/<relative_path_to_test_file>/<test_filename>/<test_name>`.
When called from other functions (e.g. from functions in this file),
the caller function and the caller file path should be provided.
:param caller_function: caller function name defaults to None
:type caller_function: str, optional
:param caller_fspath: absolute path to file containing caller, defaults to None
:type caller_fspath: str or Path, optional
:return: String path to test ouptut directory
:rtype: str
"""
if not caller_function or not caller_fspath:
caller_frame = inspect.stack()[1]
caller_fspath = caller_frame.filename
caller_function = caller_frame.function
dir_path = FileUtils._test_dir_path(caller_function, caller_fspath)
# dir_path = os.path.join(test_dir, dir_name)
try:
os.makedirs(dir_path)
except Exception:
return dir_path
return dir_path
@staticmethod
def get_test_conf_path(filename):
file_path = os.path.join(test_path, "tests", "test_configs", filename)
return file_path
@staticmethod
def get_test_dir_path(dirname):
dir_path = os.path.join(test_path, "tests", "test_configs", dirname)
return dir_path
@pytest.fixture
def mlutils():
return MLUtils
class MLUtils:
@staticmethod
def get_test_device():
global test_device
return test_device