Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: centralize execute list access #247

Merged
merged 2 commits into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion powersimdata/data_access/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__all__ = ["scenario_list"]
__all__ = ["scenario_list", "execute_list"]
84 changes: 84 additions & 0 deletions powersimdata/data_access/execute_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from powersimdata.utility import server_setup

import pandas as pd


class ExecuteListManager:
"""This class is responsible for any modifications to the execute list file.

:param paramiko.client.SSHClient ssh_client: session with an SSH server.
"""

def __init__(self, ssh_client):
"""Constructor

"""
self.ssh_client = ssh_client

def get_execute_table(self):
"""Returns execute table from server.
:return: (*pandas.DataFrame*) -- execute list as a data frame.
"""
sftp = self.ssh_client.open_sftp()

file_object = sftp.file(server_setup.EXECUTE_LIST, "rb")
execute_list = pd.read_csv(file_object)
execute_list.fillna("", inplace=True)

sftp.close()

return execute_list.astype(str)

def add_entry(self, scenario_info):
"""Adds scenario to the execute list file on server.

:param collections.OrderedDict scenario_info: entry to add
:raises IOError: if execute list file on server cannot be updated.
"""
print("--> Adding entry in execute table on server\n")
entry = "%s,created" % scenario_info["id"]
command = "echo %s >> %s" % (entry, server_setup.EXECUTE_LIST)
err_message = "Failed to update %s on server" % server_setup.EXECUTE_LIST
_ = self._execute_and_check_err(command, err_message)

def update_execute_list(self, status, scenario_info):
"""Updates status in execute list file on server.

:param str status: execution status.
:param collections.OrderedDict scenario_info: entry to update
"""
print("--> Updating status in execute table on server")
options = "-F, -v OFS=',' -v INPLACE_SUFFIX=.bak -i inplace"
# AWK parses the file line-by-line. When the entry of the first column is equal
# to the scenario identification number, the second column is replaced by the
# status parameter.
program = "'{if($1==%s) $2=\"%s\"};1'" % (scenario_info["id"], status)
command = "awk %s %s %s" % (options, program, server_setup.EXECUTE_LIST)
err_message = "Failed to update %s on server" % server_setup.EXECUTE_LIST
_ = self._execute_and_check_err(command, err_message)

def delete_entry(self, scenario_info):
"""Delete entry from execute list on server.

:param collections.OrderedDict scenario_info: entry to delete
"""
print("--> Deleting entry in execute table on server")
entry = "^%s,extracted" % scenario_info["id"]
command = "sed -i.bak '/%s/d' %s" % (entry, server_setup.EXECUTE_LIST)
err_message = (
"Failed to delete entry in %s on server" % server_setup.EXECUTE_LIST
)
_ = self._execute_and_check_err(command, err_message)

def _execute_and_check_err(self, command, err_message):
"""Executes command and checks for error.

:param str command: command to execute over ssh.
:param str err_message: error message to be raised.
:raises IOError: if command is not successfully executed.
:return: (*str*) -- standard output stream.
"""
stdin, stdout, stderr = self.ssh_client.exec_command(command)
if len(stderr.readlines()) != 0:
raise IOError(err_message)
return stdout
13 changes: 3 additions & 10 deletions powersimdata/scenario/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,10 @@ def _generate_and_set_scenario_id(self):
self._scenario_info.move_to_end("id", last=False)

def _add_entry_in_execute_list(self):
"""Adds scenario to the execute list file on server.

:raises IOError: if execute list file on server cannot be updated.
"""Adds scenario to the execute list file on server and update status
information
"""
print("--> Adding entry in execute table on server\n")
entry = "%s,created" % self._scenario_info["id"]
command = "echo %s >> %s" % (entry, server_setup.EXECUTE_LIST)

stdin, stdout, stderr = self._ssh.exec_command(command)
if len(stderr.readlines()) != 0:
raise IOError("Failed to update %s on server" % server_setup.EXECUTE_LIST)
self._execute_list_manager.add_entry(self._scenario_info)
self._scenario_status = "created"
self.allowed.append("execute")

Expand Down
11 changes: 1 addition & 10 deletions powersimdata/scenario/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,7 @@ def delete_scenario(self):

# Delete entry in scenario list
self._scenario_list_manager.delete_entry(self._scenario_info)

# Delete entry in execute list
print("--> Deleting entry in execute table on server")
entry = "^%s,extracted" % self._scenario_info["id"]
command = "sed -i.bak '/%s/d' %s" % (entry, server_setup.EXECUTE_LIST)
stdin, stdout, stderr = self._ssh.exec_command(command)
if len(stderr.readlines()) != 0:
raise IOError(
"Failed to delete entry in %s on server" % server_setup.EXECUTE_LIST
)
self._execute_list_manager.delete_entry(self._scenario_info)

# Delete links to base profiles on server
print("--> Deleting scenario input data on server")
Expand Down
28 changes: 5 additions & 23 deletions powersimdata/scenario/execute.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
from powersimdata.utility import server_setup
from powersimdata.utility.transfer_data import (
get_execute_table,
upload,
)
from powersimdata.utility.transfer_data import upload
from powersimdata.scenario.helpers import interconnect2name
from powersimdata.input.input_data import InputData
from powersimdata.input.grid import Grid
Expand Down Expand Up @@ -76,7 +73,7 @@ def _update_scenario_status(self):
"""Updates scenario status.

"""
execute_table = get_execute_table(self._ssh)
execute_table = self._execute_list_manager.get_execute_table()
scenario_id = self._scenario_info["id"]
self._scenario_status = execute_table[
execute_table.id == scenario_id
Expand All @@ -91,23 +88,6 @@ def _update_scenario_info(self):
scenario = scenario_table[scenario_table.id == scenario_id]
self._scenario_info = scenario.to_dict("records", into=OrderedDict)[0]

def _update_execute_list(self, status):
"""Updates status in execute list file on server.

:param str status: execution status.
:raises IOError: if execute list file on server cannot be updated.
"""
print("--> Updating status in execute table on server")
options = "-F, -v OFS=',' -v INPLACE_SUFFIX=.bak -i inplace"
# AWK parses the file line-by-line. When the entry of the first column is equal
# to the scenario identification number, the second column is replaced by the
# status parameter.
program = "'{if($1==%s) $2=\"%s\"};1'" % (self._scenario_info["id"], status)
command = "awk %s %s %s" % (options, program, server_setup.EXECUTE_LIST)
stdin, stdout, stderr = self._ssh.exec_command(command)
if len(stderr.readlines()) != 0:
raise IOError("Failed to update %s on server" % server_setup.EXECUTE_LIST)

def _run_script(self, script, extra_args=None):
"""Returns running process

Expand Down Expand Up @@ -182,7 +162,9 @@ def prepare_simulation_input(self):

si.prepare_mpc_file()

self._update_execute_list("prepared")
self._execute_list_manager.update_execute_list(
"prepared", self._scenario_info
)
else:
print("---------------------------")
print("SCENARIO CANNOT BE PREPARED")
Expand Down
14 changes: 5 additions & 9 deletions powersimdata/scenario/move.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from powersimdata.scenario.state import State
from powersimdata.utility import server_setup, backup
from powersimdata.scenario.helpers import interconnect2name
from powersimdata.utility.transfer_data import get_execute_table

import posixpath


class Move(State):
"""Moves scenario.

:param powersimdata.scenario.scenario.Scenario scenario: scenario instance.
"""

name = "move"
allowed = []

def __init__(self, scenario):
super().__init__(scenario)

def print_scenario_info(self):
"""Prints scenario information.

Expand Down Expand Up @@ -42,14 +45,7 @@ def move_scenario(self, target="disk"):
backup.move_output_data()
backup.move_temporary_folder()

# Update status in execute list
print("--> Updating status in execute table on server")
options = "-F, -v OFS=',' -v INPLACE_SUFFIX=.bak -i inplace"
program = "'{if($1==%s) $2=\"%s\"};1'" % (self._scenario_info["id"], "moved")
command = "awk %s %s %s" % (options, program, server_setup.EXECUTE_LIST)
stdin, stdout, stderr = self._ssh.exec_command(command)
if len(stderr.readlines()) != 0:
raise IOError("Failed to update %s on server" % server_setup.EXECUTE_LIST)
self._execute_list_manager.update_execute_list("moved", self._scenario_info)

# Delete attributes
self._clean()
Expand Down
9 changes: 4 additions & 5 deletions powersimdata/scenario/scenario.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from powersimdata.data_access.scenario_list import ScenarioListManager
from powersimdata.data_access.execute_list import ExecuteListManager
from powersimdata.utility import server_setup
from powersimdata.utility.transfer_data import (
setup_server_connection,
get_execute_table,
)
from powersimdata.utility.transfer_data import setup_server_connection
from powersimdata.scenario.analyze import Analyze
from powersimdata.scenario.create import Create
from powersimdata.scenario.execute import Execute
Expand All @@ -30,6 +28,7 @@ def __init__(self, descriptor):

self.ssh = setup_server_connection()
self._scenario_list_manager = ScenarioListManager(self.ssh)
self._execute_list_manager = ExecuteListManager(self.ssh)

if not descriptor:
self.state = Create(self)
Expand Down Expand Up @@ -119,7 +118,7 @@ def _set_status(self):

:raises Exception: if scenario not found in execute list on server.
"""
execute_table = get_execute_table(self.ssh)
execute_table = self._execute_list_manager.get_execute_table()

status = execute_table[execute_table.id == self.info["id"]]
if status.shape[0] == 0:
Expand Down
2 changes: 2 additions & 0 deletions powersimdata/scenario/state.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from powersimdata.data_access.scenario_list import ScenarioListManager
from powersimdata.data_access.execute_list import ExecuteListManager


class State(object):
Expand All @@ -19,6 +20,7 @@ def __init__(self, scenario):

self._ssh = scenario.ssh
self._scenario_list_manager = ScenarioListManager(scenario.ssh)
self._execute_list_manager = ExecuteListManager(scenario.ssh)

def switch(self, state):
"""Switches state.
Expand Down
22 changes: 12 additions & 10 deletions powersimdata/utility/tests/test_transfer_data.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from powersimdata.data_access.scenario_list import ScenarioListManager
from powersimdata.data_access.execute_list import ExecuteListManager
from powersimdata.utility.server_setup import get_server_user
from powersimdata.utility.transfer_data import (
setup_server_connection,
get_execute_table,
)
from powersimdata.utility.transfer_data import setup_server_connection

from numpy.testing import assert_array_equal
import pandas as pd
Expand All @@ -23,6 +21,12 @@ def scenario_table(ssh_client):
return scenario_list_manager.get_scenario_table()


@pytest.fixture
def execute_table(ssh_client):
execute_list_manager = ExecuteListManager(ssh_client)
return execute_list_manager.get_execute_table()


@pytest.mark.integration
def test_setup_server_connection(ssh_client):
_, stdout, _ = ssh_client.exec_command("whoami")
Expand Down Expand Up @@ -58,13 +62,11 @@ def test_get_scenario_file_from_server_header(ssh_client, scenario_table):


@pytest.mark.integration
def test_get_execute_file_from_server_type(ssh_client):
table = get_execute_table(ssh_client)
assert isinstance(table, pd.DataFrame)
def test_get_execute_file_from_server_type(ssh_client, execute_table):
assert isinstance(execute_table, pd.DataFrame)


@pytest.mark.integration
def test_get_execute_file_from_server_header(ssh_client):
def test_get_execute_file_from_server_header(ssh_client, execute_table):
header = ["id", "status"]
table = get_execute_table(ssh_client)
assert_array_equal(table.columns, header)
assert_array_equal(execute_table.columns, header)
17 changes: 0 additions & 17 deletions powersimdata/utility/transfer_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,6 @@ def upload(ssh_client, file_name, from_dir, to_dir, change_name_to=None):
sftp.close()


def get_execute_table(ssh_client):
"""Returns execute table from server.

:param paramiko.client.SSHClient ssh_client: session with an SSH server.
:return: (*pandas*) -- data frame.
"""
sftp = ssh_client.open_sftp()

file_object = sftp.file(server_setup.EXECUTE_LIST, "rb")
execute_list = pd.read_csv(file_object)
execute_list.fillna("", inplace=True)

sftp.close()

return execute_list.astype(str)


def setup_server_connection():
"""This function setup the connection to the server.

Expand Down