Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug fix in MPI evaluator #349

Merged
merged 24 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 43 additions & 12 deletions ema_workbench/em_framework/futures_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from .util import NamedObjectMap
from .model import AbstractModel
from .experiment_runner import ExperimentRunner
from ..util import get_module_logger, get_rootlogger
from ..util import get_module_logger, get_rootlogger, method_logger

from ..util import ema_logging

Expand Down Expand Up @@ -71,7 +71,7 @@ def mpi_initializer(models, log_level, root_dir):
root_logger.info(f"worker {rank} initialized")


def logwatcher(stop_event):
def logwatcher(start_event, stop_event):
from mpi4py import MPI

rank = MPI.COMM_WORLD.Get_rank()
Expand All @@ -84,6 +84,7 @@ def logwatcher(stop_event):
service = "logwatcher"
MPI.Publish_name(service, info, port)
_logger.debug(f"published service: {service}")
start_event.set()

root = 0
_logger.debug("waiting for client connection...")
Expand All @@ -96,12 +97,18 @@ def logwatcher(stop_event):
try:
logger = logging.getLogger(record.name)
except Exception as e:
# AttributeError if record does not have a name attribute
# TypeError record.name is not a string
raise e
if record.msg is None:
_logger.debug("received sentinel")
break
else:
# AttributeError if record does not have a name attribute
# TypeError record.name is not a string
raise e
else:
logger.callHandlers(record)

_logger.info("closing logwatcher")


def run_experiment_mpi(experiment):
_logger.debug(f"starting {experiment.experiment_id}")
Expand All @@ -113,6 +120,15 @@ def run_experiment_mpi(experiment):
return experiment, outcomes


def send_sentinel():
record = logging.makeLogRecord(dict(level=logging.CRITICAL, msg=None, name=42))

for handler in get_rootlogger().handlers:
if isinstance(handler, MPIHandler):
_logger.debug("sending sentinel")
handler.communicator.send(record, 0, 0)


class MPIHandler(QueueHandler):
"""
This handler sends events from the worker process to the master process
Expand Down Expand Up @@ -154,45 +170,60 @@ def __init__(self, msis, n_processes=None, **kwargs):
self.stop_event = None
self.n_processes = n_processes

@method_logger(__name__)
def initialize(self):
# Only import mpi4py if the MPIEvaluator is used, to avoid unnecessary dependencies.
from mpi4py.futures import MPIPoolExecutor

start_event = threading.Event()
self.stop_event = threading.Event()
self.logwatcher_thread = threading.Thread(
name="logwatcher", target=logwatcher, daemon=True, args=(self.stop_event,)
name="logwatcher",
target=logwatcher,
daemon=False,
args=(
start_event,
self.stop_event,
),
)
self.logwatcher_thread.start()
start_event.wait()
_logger.info("logwatcher server started")

self.root_dir = determine_rootdir(self._msis)
self._pool = MPIPoolExecutor(
max_workers=self.n_processes,
initializer=mpi_initializer,
initargs=(self._msis, _logger.level, self.root_dir),
) # Removed initializer arguments
)

self._pool = MPIPoolExecutor(max_workers=self.n_processes) # Removed initializer arguments
_logger.info(f"MPI pool started with {self._pool._max_workers} workers")
if self._pool._max_workers <= 10:
_logger.warning(
f"With only a few workers ({self._pool._max_workers}), the MPIEvaluator may be slower than the Sequential- or MultiprocessingEvaluator"
)
return self

@method_logger(__name__)
def finalize(self):
self._pool.shutdown()
# submit sentinel
self.stop_event.set()
_logger.info("MPI pool has been shut down")
self._pool.submit(send_sentinel)
self._pool.shutdown()
self.logwatcher_thread.join(timeout=60)

if self.logwatcher_thread.is_alive():
_logger.warning(f"houston we have a problem")

if self.root_dir:
shutil.rmtree(self.root_dir)

time.sleep(0.1)
_logger.info("MPI pool has been shut down")

@method_logger(__name__)
def evaluate_experiments(self, scenarios, policies, callback, combine="factorial", **kwargs):
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
experiments = list(ex_gen)
experiments = list(experiment_generator(scenarios, self._msis, policies, combine=combine))

results = self._pool.map(run_experiment_mpi, experiments, **kwargs)
for experiment, outcomes in results:
Expand Down
8 changes: 3 additions & 5 deletions ema_workbench/examples/example_mpi_lake_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@
import math
import time

# FIXME
import sys

sys.path.insert(0, "/Users/jhkwakkel/Documents/GitHub/EMAworkbench")

import numpy as np
from scipy.optimize import brentq

Expand Down Expand Up @@ -88,10 +83,13 @@ def lake_problem(


if __name__ == "__main__":
import ema_workbench

# run with mpiexec -n 1 -usize {ntasks} python example_mpi_lake_model.py
starttime = time.perf_counter()

ema_logging.log_to_stderr(ema_logging.INFO, pass_root_logger_level=True)
ema_logging.get_rootlogger().info(f"{ema_workbench.__version__}")

# instantiate the model
lake_model = Model("lakeproblem", function=lake_problem)
Expand Down
7 changes: 4 additions & 3 deletions ema_workbench/examples/slurm_script.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#!/bin/bash

#SBATCH --job-name="Python_test"
#SBATCH --time=00:02:00
#SBATCH --ntasks=10
#SBATCH --time=00:06:00
#SBATCH --ntasks=8
#SBATCH --cpus-per-task=1
#SBATCH --partition=compute
#SBATCH --mem-per-cpu=4GB
Expand All @@ -17,6 +17,7 @@ module load py-mpi4py
module load py-pip

pip install ipyparallel
pip install --user -e git+https://github.com/quaquel/EMAworkbench@mpi_update#egg=ema-workbench
pip install --user -e git+https://github.com/quaquel/EMAworkbench@mpi_fixes#egg=ema_workbench

mpiexec -n 1 python3 example_mpi_lake_model.py

Loading