Skip to content

Commit

Permalink
Merge pull request #416 from Breakthrough-Energy/jon/pandas
Browse files Browse the repository at this point in the history
Manage scenario/execute list using pandas
  • Loading branch information
jenhagg authored Mar 18, 2021
2 parents 6c3124f + 1d2dda0 commit e0b308c
Show file tree
Hide file tree
Showing 14 changed files with 368 additions and 139 deletions.
44 changes: 26 additions & 18 deletions powersimdata/data_access/csv_store.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
import functools
import os
from pathlib import Path

import pandas as pd

from powersimdata.utility import server_setup


def verify_hash(func):
"""Utility function which verifies the sha1sum of the file before writing
it on the server. Operates on methods that return an updated scenario or
execute list.
"""

@functools.wraps(func)
def wrapper(self, *args, **kwargs):
checksum = self.data_access.checksum(self._FILE_NAME)
table = func(self, *args, **kwargs)
self.commit(table, checksum)
return table

return wrapper


class CsvStore:
"""Base class for common functionality used to manage scenario and execute
list stored as csv files on the server
Expand All @@ -16,12 +34,13 @@ def __init__(self, data_access):
"""Constructor"""
self.data_access = data_access

def get_table(self, filename):
def get_table(self):
"""Read the given file from the server, falling back to local copy if
unable to connect.
:return: (*pandas.DataFrame*) -- the specified table as a data frame.
"""
filename = self._FILE_NAME
local_path = Path(server_setup.LOCAL_DIR, filename)

try:
Expand All @@ -47,22 +66,11 @@ def _parse_csv(self, file_object):
table.fillna("", inplace=True)
return table.astype(str)

def _execute_and_check_err(self, command, err_message):
"""Executes command and checks for error.
def commit(self, table, checksum):
"""Save to local directory and upload if needed
:param str command: command to execute over ssh.
:param str err_message: error message to be raised.
:raises IOError: if command is not successfully executed.
:return: (*list*) -- list of command output.
:param pandas.DataFrame table: the data frame to save
:param str checksum: the checksum prior to download
"""
stdin, stdout, stderr = self.data_access.execute_command(command)
command_output = stdout.readlines()
command_error = stderr.readlines()
if len(command_error) != 0:
command_error = [
i.replace("\t", " ").replace("\n", "") for i in command_error
]
for ce in command_error:
print(ce)
raise IOError(err_message)
return command_output
table.to_csv(os.path.join(server_setup.LOCAL_DIR, self._FILE_NAME))
self.data_access.push(self._FILE_NAME, checksum)
99 changes: 93 additions & 6 deletions powersimdata/data_access/data_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ def copy_from(self, file_name, from_dir):
"""
raise NotImplementedError

def move_to(self, file_name, to_dir, change_name_to=None):
def move_to(self, file_name, to_dir, change_name_to=None, preserve=False):
"""Copy a file from userspace to data store.
:param str file_name: file name to copy.
:param str to_dir: data store directory to copy file to.
:param str change_name_to: new name for file when copied to data store.
:param bool preserve: whether to keep the local copy
"""
raise NotImplementedError

Expand Down Expand Up @@ -97,6 +98,23 @@ def execute_command_async(self, command):
"""
raise NotImplementedError

def checksum(self, relative_path):
"""Return the checksum of the file path, and write the content if the
server is remote
:param str relative_path: path relative to root
:return: (*str*) -- the checksum of the file
"""
raise NotImplementedError

def push(self, file_name, checksum):
"""Push the file from local to remote root folder, ensuring integrity
:param str file_name: the file name, located at the local root
:param str checksum: the checksum prior to download
"""
raise NotImplementedError

def close(self):
"""Perform any necessary cleanup for the object."""
pass
Expand All @@ -116,12 +134,30 @@ def copy_from(self, file_name, from_dir=None):
"""
pass

def move_to(self, file_name, to_dir, change_name_to=None):
def push(self, file_name, checksum):
"""Nothing to be done due to symlink
:param str file_name: the file name, located at the local root
:param str checksum: the checksum prior to download
"""
pass

def checksum(self, relative_path):
"""Return dummy value since this is only required for remote
environment
:param str relative_path: path relative to root
:return: (*str*) -- the checksum of the file
"""
return "dummy_value"

def move_to(self, file_name, to_dir, change_name_to=None, preserve=False):
"""Copy a file from userspace to data store.
:param str file_name: file name to copy.
:param str to_dir: data store directory to copy file to.
:param str change_name_to: new name for file when copied to data store.
:param bool preserve: whether to keep the local copy
"""
self._check_filename(file_name)
src = posixpath.join(server_setup.LOCAL_DIR, file_name)
Expand All @@ -130,7 +166,9 @@ def move_to(self, file_name, to_dir, change_name_to=None):
print(f"--> Moving file {src} to {dest}")
self._check_file_exists(dest, should_exist=False)
self.copy(src, dest)
self.remove(src)
if not preserve:
print("--> Deleting original copy")
self.remove(src)

def execute_command(self, command):
"""Execute a command locally at the data access.
Expand Down Expand Up @@ -235,12 +273,13 @@ def copy_from(self, file_name, from_dir=None):
sftp.get(from_path, to_path, callback=cbk)
bar.close()

def move_to(self, file_name, to_dir=None, change_name_to=None):
def move_to(self, file_name, to_dir=None, change_name_to=None, preserve=False):
"""Copy a file from userspace to data store.
:param str file_name: file name to copy.
:param str to_dir: data store directory to copy file to.
:param str change_name_to: new name for file when copied to data store.
:param bool preserve: whether to keep the local copy
:raises FileNotFoundError: if specified file does not exist
"""
self._check_filename(file_name)
Expand All @@ -261,8 +300,9 @@ def move_to(self, file_name, to_dir=None, change_name_to=None):
print(f"Transferring {from_path} to server")
sftp.put(from_path, to_path)

print(f"--> Deleting {from_path} on local machine")
os.remove(from_path)
if not preserve:
print(f"--> Deleting {from_path} on local machine")
os.remove(from_path)

def execute_command(self, command):
"""Execute a command locally at the data access.
Expand All @@ -284,6 +324,53 @@ def execute_command_async(self, command):
process = Popen(full_command)
return process

def checksum(self, relative_path):
"""Return the checksum of the file path (using sha1sum)
:param str relative_path: path relative to root
:return: (*str*) -- the checksum of the file
"""
full_path = posixpath.join(self.root, relative_path)
self._check_file_exists(full_path)

command = f"sha1sum {full_path}"
_, stdout, _ = self.execute_command(command)
lines = stdout.readlines()
return lines[0].strip()

def push(self, file_name, checksum):
"""Push file_name to remote root
:param str file_name: the file name, located at the local root
:param str checksum: the checksum prior to download
:raises IOError: if command generated stderr
"""
backup = f"{file_name}.temp"
self.move_to(file_name, change_name_to=backup, preserve=True)

values = {
"original": posixpath.join(self.root, file_name),
"updated": posixpath.join(self.root, backup),
"lockfile": posixpath.join(self.root, "scenario.lockfile"),
"checksum": checksum,
}

template = "(flock -x 200; \
prev='{checksum}'; \
curr=$(sha1sum {original}); \
if [[ $prev == $curr ]]; then mv {updated} {original} -b; \
else echo CONFLICT_ERROR 1>&2; fi) \
200>{lockfile}"

command = template.format(**values)
_, _, stderr = self.execute_command(command)

errors = stderr.readlines()
if len(errors) > 0:
for e in errors:
print(e)
raise IOError("Failed to push file - most likely a conflict was detected.")

def close(self):
"""Close the connection that was opened when the object was created."""
self.ssh.close()
Expand Down
71 changes: 33 additions & 38 deletions powersimdata/data_access/execute_list.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import posixpath

from powersimdata.data_access.csv_store import CsvStore
from powersimdata.data_access.csv_store import CsvStore, verify_hash
from powersimdata.data_access.sql_store import SqlStore, to_data_frame
from powersimdata.utility import server_setup

Expand Down Expand Up @@ -50,24 +50,24 @@ def add_entry(self, scenario_info):
),
)

def update_execute_list(self, status, scenario_info):
def set_status(self, scenario_id, status):
"""Updates status of scenario in execute list
:param int scenario_id: the scenario id
:param str status: execution status.
:param collections.OrderedDict scenario_info: entry to update
"""
self.cur.execute(
"UPDATE execute_list SET status = %s WHERE id = %s",
(status, scenario_info["id"]),
(status, scenario_id),
)

def delete_entry(self, scenario_info):
def delete_entry(self, scenario_id):
"""Deletes entry from execute list.
:param collections.OrderedDict scenario_info: entry to delete
:param int/str scenario_id: the id of the scenario
"""
sql = self.delete("id")
self.cur.execute(sql, (scenario_info["id"],))
self.cur.execute(sql, (scenario_id,))


class ExecuteListManager(CsvStore):
Expand All @@ -76,22 +76,20 @@ class ExecuteListManager(CsvStore):
:param paramiko.client.SSHClient ssh_client: session with an SSH server.
"""

_EXECUTE_LIST = "ExecuteList.csv"
_FILE_NAME = "ExecuteList.csv"

def __init__(self, ssh_client):
"""Constructor"""
super().__init__(ssh_client)
self._server_path = posixpath.join(
server_setup.DATA_ROOT_DIR, self._EXECUTE_LIST
)
self._server_path = posixpath.join(server_setup.DATA_ROOT_DIR, self._FILE_NAME)

def get_execute_table(self):
"""Returns execute table from server if possible, otherwise read local
copy. Updates the local copy upon successful server connection.
:return: (*pandas.DataFrame*) -- execute list as a data frame.
"""
return self.get_table(self._EXECUTE_LIST)
return self.get_table()

def get_status(self, scenario_id):
"""Return the status for the scenario
Expand All @@ -107,39 +105,36 @@ def get_status(self, scenario_id):
raise Exception(f"Scenario not found in execute list, id = {scenario_id}")

def add_entry(self, scenario_info):
"""Adds scenario to the execute list file on server.
"""Add entry to execute list
:param collections.OrderedDict scenario_info: entry to add
"""
print("--> Adding entry in execute table on server")
entry = "%s,created" % scenario_info["id"]
command = "echo %s >> %s" % (entry, self._server_path)
err_message = "Failed to update %s on server" % self._EXECUTE_LIST
_ = self._execute_and_check_err(command, err_message)
scenario_id = int(scenario_info["id"])
return self.set_status(scenario_id, "created")

def update_execute_list(self, status, scenario_info):
"""Updates status in execute list file on server.
@verify_hash
def set_status(self, scenario_id, status):
"""Set the scenario status
:param str status: execution status.
:param collections.OrderedDict scenario_info: entry to update
:param int/str scenario_id: the scenario id
:param str status: the new status
:return: (*pandas.DataFrame*) -- the updated data frame
"""
print("--> Updating status in execute table on server")
options = "-F, -v OFS=',' -v INPLACE_SUFFIX=.bak -i inplace"
# AWK parses the file line-by-line. When the entry of the first column is equal
# to the scenario identification number, the second column is replaced by the
# status parameter.
program = "'{if($1==%s) $2=\"%s\"};1'" % (scenario_info["id"], status)
command = "awk %s %s %s" % (options, program, self._server_path)
err_message = "Failed to update %s on server" % self._EXECUTE_LIST
_ = self._execute_and_check_err(command, err_message)

def delete_entry(self, scenario_info):
table = self.get_execute_table()
table.loc[int(scenario_id), "status"] = status

print(f"--> Setting status={status} in execute table on server")
return table

@verify_hash
def delete_entry(self, scenario_id):
"""Deletes entry from execute list on server.
:param collections.OrderedDict scenario_info: entry to delete
:param int/str scenario_id: the id of the scenario
:return: (*pandas.DataFrame*) -- the updated data frame
"""
table = self.get_execute_table()
table.drop(int(scenario_id), inplace=True)

print("--> Deleting entry in execute table on server")
entry = "^%s,extracted" % scenario_info["id"]
command = "sed -i.bak '/%s/d' %s" % (entry, self._server_path)
err_message = "Failed to delete entry in %s on server" % self._EXECUTE_LIST
_ = self._execute_and_check_err(command, err_message)
return table
Loading

0 comments on commit e0b308c

Please sign in to comment.