Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage scenario/execute list using pandas #416

Merged
merged 16 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 26 additions & 18 deletions powersimdata/data_access/csv_store.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
import functools
import os
from pathlib import Path

import pandas as pd

from powersimdata.utility import server_setup


def verify_hash(func):
"""Utility function which verifies the sha1sum of the file before writing
it on the server. Operates on methods that return an updated scenario or
execute list.
"""

@functools.wraps(func)
def wrapper(self, *args, **kwargs):
checksum = self.data_access.checksum(self._FILE_NAME)
table = func(self, *args, **kwargs)
self.commit(table, checksum)
return table

return wrapper


class CsvStore:
"""Base class for common functionality used to manage scenario and execute
list stored as csv files on the server
Expand All @@ -16,12 +34,13 @@ def __init__(self, data_access):
"""Constructor"""
self.data_access = data_access

def get_table(self, filename):
def get_table(self):
"""Read the given file from the server, falling back to local copy if
unable to connect.

:return: (*pandas.DataFrame*) -- the specified table as a data frame.
"""
filename = self._FILE_NAME
local_path = Path(server_setup.LOCAL_DIR, filename)

try:
Expand All @@ -47,22 +66,11 @@ def _parse_csv(self, file_object):
table.fillna("", inplace=True)
return table.astype(str)

def _execute_and_check_err(self, command, err_message):
"""Executes command and checks for error.
def commit(self, table, checksum):
"""Save to local directory and upload if needed

:param str command: command to execute over ssh.
:param str err_message: error message to be raised.
:raises IOError: if command is not successfully executed.
:return: (*list*) -- list of command output.
:param pandas.DataFrame table: the data frame to save
:param str checksum: the checksum prior to download
"""
stdin, stdout, stderr = self.data_access.execute_command(command)
command_output = stdout.readlines()
command_error = stderr.readlines()
if len(command_error) != 0:
command_error = [
i.replace("\t", " ").replace("\n", "") for i in command_error
]
for ce in command_error:
print(ce)
raise IOError(err_message)
return command_output
table.to_csv(os.path.join(server_setup.LOCAL_DIR, self._FILE_NAME))
self.data_access.push(self._FILE_NAME, checksum)
99 changes: 93 additions & 6 deletions powersimdata/data_access/data_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ def copy_from(self, file_name, from_dir):
"""
raise NotImplementedError

def move_to(self, file_name, to_dir, change_name_to=None):
def move_to(self, file_name, to_dir, change_name_to=None, preserve=False):
"""Copy a file from userspace to data store.

:param str file_name: file name to copy.
:param str to_dir: data store directory to copy file to.
:param str change_name_to: new name for file when copied to data store.
:param bool preserve: whether to keep the local copy
"""
raise NotImplementedError

Expand Down Expand Up @@ -97,6 +98,23 @@ def execute_command_async(self, command):
"""
raise NotImplementedError

def checksum(self, relative_path):
"""Return the checksum of the file path, and write the content if the
server is remote

:param str relative_path: path relative to root
:return: (*str*) -- the checksum of the file
"""
raise NotImplementedError

def push(self, file_name, checksum):
"""Push the file from local to remote root folder, ensuring integrity

:param str file_name: the file name, located at the local root
:param str checksum: the checksum prior to download
"""
raise NotImplementedError

def close(self):
"""Perform any necessary cleanup for the object."""
pass
Expand All @@ -116,12 +134,30 @@ def copy_from(self, file_name, from_dir=None):
"""
pass

def move_to(self, file_name, to_dir, change_name_to=None):
def push(self, file_name, checksum):
"""Nothing to be done due to symlink

:param str file_name: the file name, located at the local root
:param str checksum: the checksum prior to download
"""
pass

def checksum(self, relative_path):
"""Return dummy value since this is only required for remote
environment

:param str relative_path: path relative to root
:return: (*str*) -- the checksum of the file
"""
return "dummy_value"

def move_to(self, file_name, to_dir, change_name_to=None, preserve=False):
"""Copy a file from userspace to data store.

:param str file_name: file name to copy.
:param str to_dir: data store directory to copy file to.
:param str change_name_to: new name for file when copied to data store.
:param bool preserve: whether to keep the local copy
"""
self._check_filename(file_name)
src = posixpath.join(server_setup.LOCAL_DIR, file_name)
Expand All @@ -130,7 +166,9 @@ def move_to(self, file_name, to_dir, change_name_to=None):
print(f"--> Moving file {src} to {dest}")
self._check_file_exists(dest, should_exist=False)
self.copy(src, dest)
self.remove(src)
if not preserve:
print("--> Deleting original copy")
self.remove(src)

def execute_command(self, command):
"""Execute a command locally at the data access.
Expand Down Expand Up @@ -235,12 +273,13 @@ def copy_from(self, file_name, from_dir=None):
sftp.get(from_path, to_path, callback=cbk)
bar.close()

def move_to(self, file_name, to_dir=None, change_name_to=None):
def move_to(self, file_name, to_dir=None, change_name_to=None, preserve=False):
"""Copy a file from userspace to data store.

:param str file_name: file name to copy.
:param str to_dir: data store directory to copy file to.
:param str change_name_to: new name for file when copied to data store.
:param bool preserve: whether to keep the local copy
:raises FileNotFoundError: if specified file does not exist
"""
self._check_filename(file_name)
Expand All @@ -261,8 +300,9 @@ def move_to(self, file_name, to_dir=None, change_name_to=None):
print(f"Transferring {from_path} to server")
sftp.put(from_path, to_path)

print(f"--> Deleting {from_path} on local machine")
os.remove(from_path)
if not preserve:
print(f"--> Deleting {from_path} on local machine")
os.remove(from_path)

def execute_command(self, command):
"""Execute a command locally at the data access.
Expand All @@ -284,6 +324,53 @@ def execute_command_async(self, command):
process = Popen(full_command)
return process

def checksum(self, relative_path):
"""Return the checksum of the file path (using sha1sum)

:param str relative_path: path relative to root
:return: (*str*) -- the checksum of the file
"""
full_path = posixpath.join(self.root, relative_path)
self._check_file_exists(full_path)

command = f"sha1sum {full_path}"
_, stdout, _ = self.execute_command(command)
lines = stdout.readlines()
return lines[0].strip()

def push(self, file_name, checksum):
"""Push file_name to remote root

:param str file_name: the file name, located at the local root
:param str checksum: the checksum prior to download
:raises IOError: if command generated stderr
"""
backup = f"{file_name}.temp"
self.move_to(file_name, change_name_to=backup, preserve=True)

values = {
"original": posixpath.join(self.root, file_name),
"updated": posixpath.join(self.root, backup),
"lockfile": posixpath.join(self.root, "scenario.lockfile"),
"checksum": checksum,
}

template = "(flock -x 200; \
prev='{checksum}'; \
curr=$(sha1sum {original}); \
if [[ $prev == $curr ]]; then mv {updated} {original} -b; \
else echo CONFLICT_ERROR 1>&2; fi) \
200>{lockfile}"

command = template.format(**values)
_, _, stderr = self.execute_command(command)

errors = stderr.readlines()
if len(errors) > 0:
for e in errors:
print(e)
raise IOError("Failed to push file - most likely a conflict was detected.")

def close(self):
"""Close the connection that was opened when the object was created."""
self.ssh.close()
Expand Down
71 changes: 33 additions & 38 deletions powersimdata/data_access/execute_list.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import posixpath

from powersimdata.data_access.csv_store import CsvStore
from powersimdata.data_access.csv_store import CsvStore, verify_hash
from powersimdata.data_access.sql_store import SqlStore, to_data_frame
from powersimdata.utility import server_setup

Expand Down Expand Up @@ -50,24 +50,24 @@ def add_entry(self, scenario_info):
),
)

def update_execute_list(self, status, scenario_info):
def set_status(self, scenario_id, status):
"""Updates status of scenario in execute list

:param int scenario_id: the scenario id
:param str status: execution status.
:param collections.OrderedDict scenario_info: entry to update
"""
self.cur.execute(
"UPDATE execute_list SET status = %s WHERE id = %s",
(status, scenario_info["id"]),
(status, scenario_id),
)

def delete_entry(self, scenario_info):
def delete_entry(self, scenario_id):
"""Deletes entry from execute list.

:param collections.OrderedDict scenario_info: entry to delete
:param int/str scenario_id: the id of the scenario
"""
sql = self.delete("id")
self.cur.execute(sql, (scenario_info["id"],))
self.cur.execute(sql, (scenario_id,))


class ExecuteListManager(CsvStore):
Expand All @@ -76,22 +76,20 @@ class ExecuteListManager(CsvStore):
:param paramiko.client.SSHClient ssh_client: session with an SSH server.
"""

_EXECUTE_LIST = "ExecuteList.csv"
_FILE_NAME = "ExecuteList.csv"

def __init__(self, ssh_client):
"""Constructor"""
super().__init__(ssh_client)
self._server_path = posixpath.join(
server_setup.DATA_ROOT_DIR, self._EXECUTE_LIST
)
self._server_path = posixpath.join(server_setup.DATA_ROOT_DIR, self._FILE_NAME)

def get_execute_table(self):
"""Returns execute table from server if possible, otherwise read local
copy. Updates the local copy upon successful server connection.

:return: (*pandas.DataFrame*) -- execute list as a data frame.
"""
return self.get_table(self._EXECUTE_LIST)
return self.get_table()

def get_status(self, scenario_id):
"""Return the status for the scenario
Expand All @@ -107,39 +105,36 @@ def get_status(self, scenario_id):
raise Exception(f"Scenario not found in execute list, id = {scenario_id}")

def add_entry(self, scenario_info):
"""Adds scenario to the execute list file on server.
"""Add entry to execute list

:param collections.OrderedDict scenario_info: entry to add
"""
print("--> Adding entry in execute table on server")
entry = "%s,created" % scenario_info["id"]
command = "echo %s >> %s" % (entry, self._server_path)
err_message = "Failed to update %s on server" % self._EXECUTE_LIST
_ = self._execute_and_check_err(command, err_message)
scenario_id = int(scenario_info["id"])
return self.set_status(scenario_id, "created")

def update_execute_list(self, status, scenario_info):
"""Updates status in execute list file on server.
@verify_hash
def set_status(self, scenario_id, status):
"""Set the scenario status

:param str status: execution status.
:param collections.OrderedDict scenario_info: entry to update
:param int/str scenario_id: the scenario id
:param str status: the new status
:return: (*pandas.DataFrame*) -- the updated data frame
"""
print("--> Updating status in execute table on server")
options = "-F, -v OFS=',' -v INPLACE_SUFFIX=.bak -i inplace"
# AWK parses the file line-by-line. When the entry of the first column is equal
# to the scenario identification number, the second column is replaced by the
# status parameter.
program = "'{if($1==%s) $2=\"%s\"};1'" % (scenario_info["id"], status)
command = "awk %s %s %s" % (options, program, self._server_path)
err_message = "Failed to update %s on server" % self._EXECUTE_LIST
_ = self._execute_and_check_err(command, err_message)

def delete_entry(self, scenario_info):
table = self.get_execute_table()
table.loc[int(scenario_id), "status"] = status

print(f"--> Setting status={status} in execute table on server")
return table

@verify_hash
def delete_entry(self, scenario_id):
"""Deletes entry from execute list on server.

:param collections.OrderedDict scenario_info: entry to delete
:param int/str scenario_id: the id of the scenario
:return: (*pandas.DataFrame*) -- the updated data frame
"""
table = self.get_execute_table()
table.drop(int(scenario_id), inplace=True)

print("--> Deleting entry in execute table on server")
entry = "^%s,extracted" % scenario_info["id"]
command = "sed -i.bak '/%s/d' %s" % (entry, self._server_path)
err_message = "Failed to delete entry in %s on server" % self._EXECUTE_LIST
_ = self._execute_and_check_err(command, err_message)
return table
Loading