Skip to content

Commit

Permalink
Merge pull request #247 from Breakthrough-Energy/jon/executelist
Browse files Browse the repository at this point in the history
feat: centralize execute list access
  • Loading branch information
jenhagg authored Aug 6, 2020
2 parents 00848fd + 4240477 commit fd47f40
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 85 deletions.
2 changes: 1 addition & 1 deletion powersimdata/data_access/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__all__ = ["scenario_list"]
__all__ = ["scenario_list", "execute_list"]
83 changes: 83 additions & 0 deletions powersimdata/data_access/execute_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from powersimdata.utility import server_setup

import pandas as pd


class ExecuteListManager:
"""This class is responsible for any modifications to the execute list file.
:param paramiko.client.SSHClient ssh_client: session with an SSH server.
"""

def __init__(self, ssh_client):
"""Constructor
"""
self.ssh_client = ssh_client

def get_execute_table(self):
"""Returns execute table from server.
:return: (*pandas.DataFrame*) -- execute list as a data frame.
"""
sftp = self.ssh_client.open_sftp()

file_object = sftp.file(server_setup.EXECUTE_LIST, "rb")
execute_list = pd.read_csv(file_object)
execute_list.fillna("", inplace=True)

sftp.close()

return execute_list.astype(str)

def add_entry(self, scenario_info):
"""Adds scenario to the execute list file on server.
:param collections.OrderedDict scenario_info: entry to add
"""
print("--> Adding entry in execute table on server")
entry = "%s,created" % scenario_info["id"]
command = "echo %s >> %s" % (entry, server_setup.EXECUTE_LIST)
err_message = "Failed to update %s on server" % server_setup.EXECUTE_LIST
_ = self._execute_and_check_err(command, err_message)

def update_execute_list(self, status, scenario_info):
"""Updates status in execute list file on server.
:param str status: execution status.
:param collections.OrderedDict scenario_info: entry to update
"""
print("--> Updating status in execute table on server")
options = "-F, -v OFS=',' -v INPLACE_SUFFIX=.bak -i inplace"
# AWK parses the file line-by-line. When the entry of the first column is equal
# to the scenario identification number, the second column is replaced by the
# status parameter.
program = "'{if($1==%s) $2=\"%s\"};1'" % (scenario_info["id"], status)
command = "awk %s %s %s" % (options, program, server_setup.EXECUTE_LIST)
err_message = "Failed to update %s on server" % server_setup.EXECUTE_LIST
_ = self._execute_and_check_err(command, err_message)

def delete_entry(self, scenario_info):
"""Deletes entry from execute list on server.
:param collections.OrderedDict scenario_info: entry to delete
"""
print("--> Deleting entry in execute table on server")
entry = "^%s,extracted" % scenario_info["id"]
command = "sed -i.bak '/%s/d' %s" % (entry, server_setup.EXECUTE_LIST)
err_message = (
"Failed to delete entry in %s on server" % server_setup.EXECUTE_LIST
)
_ = self._execute_and_check_err(command, err_message)

def _execute_and_check_err(self, command, err_message):
"""Executes command and checks for error.
:param str command: command to execute over ssh.
:param str err_message: error message to be raised.
:raises IOError: if command is not successfully executed.
:return: (*str*) -- standard output stream.
"""
stdin, stdout, stderr = self.ssh_client.exec_command(command)
if len(stderr.readlines()) != 0:
raise IOError(err_message)
return stdout
13 changes: 3 additions & 10 deletions powersimdata/scenario/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,10 @@ def _generate_and_set_scenario_id(self):
self._scenario_info.move_to_end("id", last=False)

def _add_entry_in_execute_list(self):
"""Adds scenario to the execute list file on server.
:raises IOError: if execute list file on server cannot be updated.
"""Adds scenario to the execute list file on server and update status
information
"""
print("--> Adding entry in execute table on server\n")
entry = "%s,created" % self._scenario_info["id"]
command = "echo %s >> %s" % (entry, server_setup.EXECUTE_LIST)

stdin, stdout, stderr = self._ssh.exec_command(command)
if len(stderr.readlines()) != 0:
raise IOError("Failed to update %s on server" % server_setup.EXECUTE_LIST)
self._execute_list_manager.add_entry(self._scenario_info)
self._scenario_status = "created"
self.allowed.append("execute")

Expand Down
11 changes: 1 addition & 10 deletions powersimdata/scenario/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,7 @@ def delete_scenario(self):

# Delete entry in scenario list
self._scenario_list_manager.delete_entry(self._scenario_info)

# Delete entry in execute list
print("--> Deleting entry in execute table on server")
entry = "^%s,extracted" % self._scenario_info["id"]
command = "sed -i.bak '/%s/d' %s" % (entry, server_setup.EXECUTE_LIST)
stdin, stdout, stderr = self._ssh.exec_command(command)
if len(stderr.readlines()) != 0:
raise IOError(
"Failed to delete entry in %s on server" % server_setup.EXECUTE_LIST
)
self._execute_list_manager.delete_entry(self._scenario_info)

# Delete links to base profiles on server
print("--> Deleting scenario input data on server")
Expand Down
28 changes: 5 additions & 23 deletions powersimdata/scenario/execute.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
from powersimdata.utility import server_setup
from powersimdata.utility.transfer_data import (
get_execute_table,
upload,
)
from powersimdata.utility.transfer_data import upload
from powersimdata.scenario.helpers import interconnect2name
from powersimdata.input.input_data import InputData
from powersimdata.input.grid import Grid
Expand Down Expand Up @@ -76,7 +73,7 @@ def _update_scenario_status(self):
"""Updates scenario status.
"""
execute_table = get_execute_table(self._ssh)
execute_table = self._execute_list_manager.get_execute_table()
scenario_id = self._scenario_info["id"]
self._scenario_status = execute_table[
execute_table.id == scenario_id
Expand All @@ -91,23 +88,6 @@ def _update_scenario_info(self):
scenario = scenario_table[scenario_table.id == scenario_id]
self._scenario_info = scenario.to_dict("records", into=OrderedDict)[0]

def _update_execute_list(self, status):
"""Updates status in execute list file on server.
:param str status: execution status.
:raises IOError: if execute list file on server cannot be updated.
"""
print("--> Updating status in execute table on server")
options = "-F, -v OFS=',' -v INPLACE_SUFFIX=.bak -i inplace"
# AWK parses the file line-by-line. When the entry of the first column is equal
# to the scenario identification number, the second column is replaced by the
# status parameter.
program = "'{if($1==%s) $2=\"%s\"};1'" % (self._scenario_info["id"], status)
command = "awk %s %s %s" % (options, program, server_setup.EXECUTE_LIST)
stdin, stdout, stderr = self._ssh.exec_command(command)
if len(stderr.readlines()) != 0:
raise IOError("Failed to update %s on server" % server_setup.EXECUTE_LIST)

def _run_script(self, script, extra_args=None):
"""Returns running process
Expand Down Expand Up @@ -182,7 +162,9 @@ def prepare_simulation_input(self):

si.prepare_mpc_file()

self._update_execute_list("prepared")
self._execute_list_manager.update_execute_list(
"prepared", self._scenario_info
)
else:
print("---------------------------")
print("SCENARIO CANNOT BE PREPARED")
Expand Down
17 changes: 8 additions & 9 deletions powersimdata/scenario/move.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
from powersimdata.scenario.state import State
from powersimdata.utility import server_setup, backup
from powersimdata.scenario.helpers import interconnect2name
from powersimdata.utility.transfer_data import get_execute_table

import posixpath


class Move(State):
"""Moves scenario.
:param powersimdata.scenario.scenario.Scenario scenario: scenario instance.
"""

name = "move"
allowed = []

def __init__(self, scenario):
"""Constructor
"""
super().__init__(scenario)

def print_scenario_info(self):
"""Prints scenario information.
Expand Down Expand Up @@ -42,14 +48,7 @@ def move_scenario(self, target="disk"):
backup.move_output_data()
backup.move_temporary_folder()

# Update status in execute list
print("--> Updating status in execute table on server")
options = "-F, -v OFS=',' -v INPLACE_SUFFIX=.bak -i inplace"
program = "'{if($1==%s) $2=\"%s\"};1'" % (self._scenario_info["id"], "moved")
command = "awk %s %s %s" % (options, program, server_setup.EXECUTE_LIST)
stdin, stdout, stderr = self._ssh.exec_command(command)
if len(stderr.readlines()) != 0:
raise IOError("Failed to update %s on server" % server_setup.EXECUTE_LIST)
self._execute_list_manager.update_execute_list("moved", self._scenario_info)

# Delete attributes
self._clean()
Expand Down
9 changes: 4 additions & 5 deletions powersimdata/scenario/scenario.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from powersimdata.data_access.scenario_list import ScenarioListManager
from powersimdata.data_access.execute_list import ExecuteListManager
from powersimdata.utility import server_setup
from powersimdata.utility.transfer_data import (
setup_server_connection,
get_execute_table,
)
from powersimdata.utility.transfer_data import setup_server_connection
from powersimdata.scenario.analyze import Analyze
from powersimdata.scenario.create import Create
from powersimdata.scenario.execute import Execute
Expand All @@ -30,6 +28,7 @@ def __init__(self, descriptor):

self.ssh = setup_server_connection()
self._scenario_list_manager = ScenarioListManager(self.ssh)
self._execute_list_manager = ExecuteListManager(self.ssh)

if not descriptor:
self.state = Create(self)
Expand Down Expand Up @@ -119,7 +118,7 @@ def _set_status(self):
:raises Exception: if scenario not found in execute list on server.
"""
execute_table = get_execute_table(self.ssh)
execute_table = self._execute_list_manager.get_execute_table()

status = execute_table[execute_table.id == self.info["id"]]
if status.shape[0] == 0:
Expand Down
2 changes: 2 additions & 0 deletions powersimdata/scenario/state.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from powersimdata.data_access.scenario_list import ScenarioListManager
from powersimdata.data_access.execute_list import ExecuteListManager


class State(object):
Expand All @@ -19,6 +20,7 @@ def __init__(self, scenario):

self._ssh = scenario.ssh
self._scenario_list_manager = ScenarioListManager(scenario.ssh)
self._execute_list_manager = ExecuteListManager(scenario.ssh)

def switch(self, state):
"""Switches state.
Expand Down
22 changes: 12 additions & 10 deletions powersimdata/utility/tests/test_transfer_data.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from powersimdata.data_access.scenario_list import ScenarioListManager
from powersimdata.data_access.execute_list import ExecuteListManager
from powersimdata.utility.server_setup import get_server_user
from powersimdata.utility.transfer_data import (
setup_server_connection,
get_execute_table,
)
from powersimdata.utility.transfer_data import setup_server_connection

from numpy.testing import assert_array_equal
import pandas as pd
Expand All @@ -23,6 +21,12 @@ def scenario_table(ssh_client):
return scenario_list_manager.get_scenario_table()


@pytest.fixture
def execute_table(ssh_client):
execute_list_manager = ExecuteListManager(ssh_client)
return execute_list_manager.get_execute_table()


@pytest.mark.integration
def test_setup_server_connection(ssh_client):
_, stdout, _ = ssh_client.exec_command("whoami")
Expand Down Expand Up @@ -58,13 +62,11 @@ def test_get_scenario_file_from_server_header(ssh_client, scenario_table):


@pytest.mark.integration
def test_get_execute_file_from_server_type(ssh_client):
table = get_execute_table(ssh_client)
assert isinstance(table, pd.DataFrame)
def test_get_execute_file_from_server_type(ssh_client, execute_table):
assert isinstance(execute_table, pd.DataFrame)


@pytest.mark.integration
def test_get_execute_file_from_server_header(ssh_client):
def test_get_execute_file_from_server_header(ssh_client, execute_table):
header = ["id", "status"]
table = get_execute_table(ssh_client)
assert_array_equal(table.columns, header)
assert_array_equal(execute_table.columns, header)
17 changes: 0 additions & 17 deletions powersimdata/utility/transfer_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,6 @@ def upload(ssh_client, file_name, from_dir, to_dir, change_name_to=None):
sftp.close()


def get_execute_table(ssh_client):
"""Returns execute table from server.
:param paramiko.client.SSHClient ssh_client: session with an SSH server.
:return: (*pandas*) -- data frame.
"""
sftp = ssh_client.open_sftp()

file_object = sftp.file(server_setup.EXECUTE_LIST, "rb")
execute_list = pd.read_csv(file_object)
execute_list.fillna("", inplace=True)

sftp.close()

return execute_list.astype(str)


def setup_server_connection():
"""This function setup the connection to the server.
Expand Down

0 comments on commit fd47f40

Please sign in to comment.