Skip to content

Commit

Permalink
Merge pull request #477 from Breakthrough-Energy/jon/windows
Browse files Browse the repository at this point in the history
Support local+native usage
  • Loading branch information
jenhagg authored May 18, 2021
2 parents d60d707 + 27c0c56 commit 6d26b2f
Show file tree
Hide file tree
Showing 17 changed files with 429 additions and 205 deletions.
4 changes: 4 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ build
**/__pycache__
.ipynb_checkpoints
**/.ropeproject
.env
.venv
.dockerignore
config.ini
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# This is specific to this package
powersimdata/utility/.server_user
config.ini

# The remainder of this file taken from github/gitignore
# https://github.com/github/gitignore/blob/master/Python.gitignore
Expand Down
19 changes: 18 additions & 1 deletion powersimdata/data_access/context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from powersimdata.data_access.data_access import LocalDataAccess, SSHDataAccess
from powersimdata.data_access.launcher import HttpLauncher, NativeLauncher, SSHLauncher
from powersimdata.utility import server_setup
from powersimdata.utility.server_setup import DeploymentMode, get_deployment_mode
from powersimdata.utility.config import DeploymentMode, get_deployment_mode


class Context:
Expand All @@ -13,6 +14,8 @@ def get_data_access(data_loc=None):
:param str data_loc: pass "disk" if using data from backup disk,
otherwise leave the default.
:return: (:class:`powersimdata.data_access.data_access.DataAccess`) -- a data access
instance
"""
if data_loc == "disk":
root = server_setup.BACKUP_DATA_ROOT_DIR
Expand All @@ -23,3 +26,17 @@ def get_data_access(data_loc=None):
if mode == DeploymentMode.Server:
return SSHDataAccess(root)
return LocalDataAccess(root)

@staticmethod
def get_launcher(scenario):
"""Return instance for interaction with simulation engine
:param powersimdata.scenario.scenario.Scenario scenario: a scenario object
:return: (:class:`powersimdata.data_access.launcher.Launcher`) -- a launcher instance
"""
mode = get_deployment_mode()
if mode == DeploymentMode.Server:
return SSHLauncher(scenario)
elif mode == DeploymentMode.Container:
return HttpLauncher(scenario)
return NativeLauncher(scenario)
10 changes: 6 additions & 4 deletions powersimdata/data_access/data_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
class DataAccess:
"""Interface to a local or remote data store."""

def __init__(self, root=None):
def __init__(self, root=None, backup_root=None):
"""Constructor"""
self.root = server_setup.DATA_ROOT_DIR if root is None else root
self.backup_root = server_setup.BACKUP_DATA_ROOT_DIR
self.backup_root = (
server_setup.BACKUP_DATA_ROOT_DIR if backup_root is None else backup_root
)
self.join = None

def copy_from(self, file_name, from_dir):
Expand Down Expand Up @@ -291,9 +293,9 @@ class SSHDataAccess(DataAccess):

_last_attempt = 0

def __init__(self, root=None):
def __init__(self, root=None, backup_root=None):
"""Constructor"""
super().__init__(root)
super().__init__(root, backup_root)
self._ssh = None
self._retry_after = 5
self.local_root = server_setup.LOCAL_DIR
Expand Down
8 changes: 4 additions & 4 deletions powersimdata/data_access/execute_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def get_status(self, scenario_id):
"""Return the status for the scenario
:param str/int scenario_id: the scenario id
:raises Exception: if scenario not found in execute list on server.
:raises Exception: if scenario not found in execute list.
:return: (*str*) -- scenario status
"""
table = self.get_execute_table()
Expand Down Expand Up @@ -46,18 +46,18 @@ def set_status(self, scenario_id, status):
table = self.get_execute_table()
table.loc[int(scenario_id), "status"] = status

print(f"--> Setting status={status} in execute table on server")
print(f"--> Setting status={status} in execute list")
return table

@verify_hash
def delete_entry(self, scenario_id):
"""Deletes entry from execute list on server.
"""Deletes entry from execute list.
:param int/str scenario_id: the id of the scenario
:return: (*pandas.DataFrame*) -- the updated data frame
"""
table = self.get_execute_table()
table.drop(int(scenario_id), inplace=True)

print("--> Deleting entry in execute table on server")
print("--> Deleting entry in %s" % self._FILE_NAME)
return table
209 changes: 209 additions & 0 deletions powersimdata/data_access/launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
import posixpath
import sys

import requests

from powersimdata.utility import server_setup


def _check_threads(threads):
"""Validate threads argument
:param int threads: the number of threads to be used
:raises TypeError: if threads is not an int
:raises ValueError: if threads is not a positive value
"""
if threads is not None:
if not isinstance(threads, int):
raise TypeError("threads must be an int")
if threads < 1:
raise ValueError("threads must be a positive value")


def _check_solver(solver):
"""Validate solver argument
:param str solver: the solver used for the optimization
:raises TypeError: if solver is not a str
:raises ValueError: if invalid solver provided
"""
if not isinstance(solver, str):
raise TypeError("solver must be a str")
solvers = ("gurobi", "glpk")
if solver is not None and solver.lower() not in solvers:
raise ValueError(f"Invalid solver: options are {solvers}")


class Launcher:
"""Base class for interaction with simulation engine.
:param powrsimdata.scenario.scenario.Scenario scenario: scenario instance
"""

def __init__(self, scenario):
self.scenario = scenario

def _launch(self, threads=None, solver=None, extract_data=True):
"""Launches simulation on target environment
:param int threads: the number of threads to be used. This defaults to None,
where None means auto.
:param str solver: the solver used for optimization. This defaults to
None, which translates to gurobi
:param bool extract_data: whether the results of the simulation engine should
automatically extracted after the simulation has run. This defaults to True.
:raises NotImplementedError: always - this must be implemented in a subclass
"""
raise NotImplementedError

def extract_simulation_output(self):
"""Extracts simulation outputs {PG, PF, LMP, CONGU, CONGL} on server."""
pass

def launch_simulation(self, threads=None, solver=None, extract_data=True):
"""Launches simulation on target environment
:param int threads: the number of threads to be used. This defaults to None,
where None means auto.
:param str solver: the solver used for optimization. This defaults to
None, which translates to gurobi
:param bool extract_data: whether the results of the simulation engine should
automatically extracted after the simulation has run. This defaults to True.
:return: (*subprocess.Popen*) or (*dict*) - the process, if using ssh to server,
otherwise a dict containing status information.
"""
_check_threads(threads)
_check_solver(solver)
return self._launch(threads, solver, extract_data)


class SSHLauncher(Launcher):
def _run_script(self, script, extra_args=None):
"""Returns running process
:param str script: script to be used.
:param list extra_args: list of strings to be passed after scenario id.
:return: (*subprocess.Popen*) -- process used to run script
"""
if extra_args is None:
extra_args = []

engine = self.scenario._scenario_info["engine"]
path_to_package = posixpath.join(server_setup.MODEL_DIR, engine)
folder = "pyreise" if engine == "REISE" else "pyreisejl"

path_to_script = posixpath.join(path_to_package, folder, "utility", script)
cmd_pythonpath = [f'export PYTHONPATH="{path_to_package}:$PYTHONPATH";']
cmd_pythoncall = [
"nohup",
"python3",
"-u",
path_to_script,
self.scenario.scenario_id,
]
cmd_io_redirect = ["</dev/null >/dev/null 2>&1 &"]
cmd = cmd_pythonpath + cmd_pythoncall + extra_args + cmd_io_redirect
process = self.scenario._data_access.execute_command_async(cmd)
print("PID: %s" % process.pid)
return process

def _launch(self, threads=None, solver=None, extract_data=True):
"""Launch simulation on server, via ssh.
:param int threads: the number of threads to be used. This defaults to None,
where None means auto.
:param str solver: the solver used for optimization. This defaults to
None, which translates to gurobi
:param bool extract_data: whether the results of the simulation engine should
automatically extracted after the simulation has run. This defaults to True.
:raises TypeError: if extract_data is not a boolean
:return: (*subprocess.Popen*) -- new process used to launch simulation.
"""
extra_args = []
if threads is not None:
# Use the -t flag as defined in call.py in REISE.jl
extra_args.append("--threads " + str(threads))

if solver is not None:
extra_args.append("--solver " + solver)

if not isinstance(extract_data, bool):
raise TypeError("extract_data must be a bool")
if extract_data:
extra_args.append("--extract-data")

return self._run_script("call.py", extra_args=extra_args)

def extract_simulation_output(self):
"""Extracts simulation outputs {PG, PF, LMP, CONGU, CONGL} on server.
:return: (*subprocess.Popen*) -- new process used to extract output
data.
"""
print("--> Extracting output data on server")
return self._run_script("extract_data.py")

def check_progress(self):
print("Information is available on the server.")


class HttpLauncher(Launcher):
def _launch(self, threads=None, solver=None, extract_data=True):
"""Launch simulation in container via http call
:param int threads: the number of threads to be used. This defaults to None,
where None means auto.
:param str solver: the solver used for optimization. This defaults to
None, which translates to gurobi
:param bool extract_data: always True
:return: (*dict*) -- contains "output", "errors", "scenario_id", and "status"
keys which map to stdout, stderr, and the respective scenario attributes
"""
scenario_id = self.scenario.scenario_id
url = f"http://{server_setup.SERVER_ADDRESS}:5000/launch/{scenario_id}"
resp = requests.post(url, params={"threads": threads, "solver": solver})
if resp.status_code != 200:
print(
f"Failed to launch simulation: status={resp.status_code}. See response for details"
)
return resp.json()

def check_progress(self):
"""Get the status of an ongoing simulation, if possible
:return: (*dict*) -- contains "output", "errors", "scenario_id", and "status"
keys which map to stdout, stderr, and the respective scenario attributes
"""
scenario_id = self.scenario.scenario_id
url = f"http://{server_setup.SERVER_ADDRESS}:5000/status/{scenario_id}"
resp = requests.get(url)
return resp.json()


class NativeLauncher(Launcher):
def _launch(self, threads=None, solver=None, extract_data=True):
"""Launch simulation by importing from REISE.jl
:param int threads: the number of threads to be used. This defaults to None,
where None means auto.
:param str solver: the solver used for optimization. This defaults to
None, which translates to gurobi
:param bool extract_data: always True
:return: (*dict*) -- contains "output", "errors", "scenario_id", and "status"
keys which map to stdout, stderr, and the respective scenario attributes
"""
sys.path.append(server_setup.ENGINE_DIR)
from pyreisejl.utility import app

return app.launch_simulation(self.scenario.scenario_id, threads, solver)

def check_progress(self):
"""Get the status of an ongoing simulation, if possible
:return: (*dict*) -- contains "output", "errors", "scenario_id", and "status"
keys which map to stdout, stderr, and the respective scenario attributes
"""
sys.path.append(server_setup.ENGINE_DIR)
from pyreisejl.utility import app

return app.check_progress()
6 changes: 3 additions & 3 deletions powersimdata/data_access/scenario_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def err_message(text):

@verify_hash
def add_entry(self, scenario_info):
"""Adds scenario to the scenario list file on server.
"""Adds scenario to the scenario list file.
:param collections.OrderedDict scenario_info: entry to add to scenario list.
:return: (*pandas.DataFrame*) -- the updated data frame
Expand All @@ -78,7 +78,7 @@ def add_entry(self, scenario_info):
table = table.append(entry)
table.set_index("id", inplace=True)

print("--> Adding entry in %s on server" % self._FILE_NAME)
print("--> Adding entry in %s" % self._FILE_NAME)
return table

@verify_hash
Expand All @@ -91,5 +91,5 @@ def delete_entry(self, scenario_id):
table = self.get_scenario_table()
table.drop(int(scenario_id), inplace=True)

print("--> Deleting entry in %s on server" % self._FILE_NAME)
print("--> Deleting entry in %s" % self._FILE_NAME)
return table
4 changes: 2 additions & 2 deletions powersimdata/data_access/tests/test_data_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
from powersimdata.utility import server_setup

CONTENT = b"content"
backup_root = "/mnt/backup_dir"


@pytest.fixture
def data_access():
data_access = SSHDataAccess()
data_access = SSHDataAccess(backup_root=backup_root)
yield data_access
data_access.close()

Expand Down Expand Up @@ -68,7 +69,6 @@ def _check_content(filepath):


root_dir = server_setup.DATA_ROOT_DIR.rstrip("/")
backup_root = server_setup.BACKUP_DATA_ROOT_DIR


def test_base_dir(data_access):
Expand Down
Loading

0 comments on commit 6d26b2f

Please sign in to comment.