Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional support for windows #475

Merged
merged 9 commits into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 114 additions & 76 deletions powersimdata/data_access/data_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import posixpath
import shutil
import time
from subprocess import PIPE, Popen
from subprocess import Popen
from tempfile import mkstemp

import paramiko
Expand All @@ -14,10 +14,22 @@
from powersimdata.utility import server_setup
from powersimdata.utility.helpers import CommandBuilder

_dirs = {
"tmp": (server_setup.EXECUTE_DIR,),
"input": server_setup.INPUT_DIR,
"output": server_setup.OUTPUT_DIR,
}


class DataAccess:
"""Interface to a local or remote data store."""

def __init__(self, root=None):
"""Constructor"""
self.root = server_setup.DATA_ROOT_DIR if root is None else root
self.backup_root = server_setup.BACKUP_DATA_ROOT_DIR
self.join = None

def copy_from(self, file_name, from_dir):
"""Copy a file from data store to userspace.

Expand All @@ -35,6 +47,34 @@ def move_to(self, file_name, to_dir, change_name_to=None):
"""
raise NotImplementedError

def get_base_dir(self, kind, backup=False):
"""Get path to given kind relative to instance root

:param str kind: one of {input, output, tmp}
:param bool backup: pass True if relative to backup root dir
:raises ValueError: if kind is invalid
:return: (*str*) -- the specified path
"""
_allowed = list(_dirs.keys())
if kind not in _allowed:
raise ValueError(f"Invalid 'kind', must be one of {_allowed}")

root = self.root if not backup else self.backup_root
return self.join(root, *_dirs[kind])

def match_scenario_files(self, scenario_id, kind, backup=False):
"""Get path matching the given kind of scenario data

:param int/str scenario_id: the scenario id
:param str kind: one of {input, output, tmp}
:param bool backup: pass True if relative to backup root dir
:return: (*str*) -- the specified path
"""
base_dir = self.get_base_dir(kind, backup)
if kind == "tmp":
return self.join(base_dir, f"scenario_{scenario_id}")
return self.join(base_dir, f"{scenario_id}_*")

def copy(self, src, dest, recursive=False, update=False):
"""Wrapper around cp command which creates dest path if needed

Expand All @@ -43,9 +83,7 @@ def copy(self, src, dest, recursive=False, update=False):
:param bool recursive: create directories recursively
:param bool update: only copy if needed
"""
self.makedir(posixpath.dirname(dest))
command = CommandBuilder.copy(src, dest, recursive, update)
return self.execute_command(command)
raise NotImplementedError

def remove(self, target, recursive=False, confirm=True):
"""Wrapper around rm command
Expand Down Expand Up @@ -86,17 +124,10 @@ def _check_filename(self, filename):
if len(os.path.dirname(filename)) != 0:
raise ValueError(f"Expecting file name but got path {filename}")

def makedir(self, relative_path):
"""Create paths relative to the instance root
def makedir(self, full_path):
"""Create path, including parents

:param str relative_path: the path, without filename, relative to root
"""
raise NotImplementedError

def execute_command(self, command):
"""Execute a command locally at the data access.

:param list command: list of str to be passed to command line.
:param str full_path: the path, excluding filename
"""
raise NotImplementedError

Expand Down Expand Up @@ -143,8 +174,9 @@ class LocalDataAccess(DataAccess):
"""Interface to shared data volume"""

def __init__(self, root=None):
self.root = root if root else server_setup.DATA_ROOT_DIR
super().__init__(root)
self.description = "local machine"
self.join = os.path.join

def copy_from(self, file_name, from_dir=None):
"""Copy a file from data store to userspace.
Expand Down Expand Up @@ -180,21 +212,41 @@ def move_to(self, file_name, to_dir, change_name_to=None):
:param str change_name_to: new name for file when copied to data store.
"""
self._check_filename(file_name)
src = posixpath.join(server_setup.LOCAL_DIR, file_name)
src = self.join(server_setup.LOCAL_DIR, file_name)
file_name = file_name if change_name_to is None else change_name_to
dest = posixpath.join(self.root, to_dir, file_name)
dest = self.join(self.root, to_dir, file_name)
print(f"--> Moving file {src} to {dest}")
self._check_file_exists(dest, should_exist=False)
self.copy(src, dest)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoiding the use of copy because dest is a file, and we don't it to automatically create /path/to/file.txt/ as a folder

self.remove(src)
self.makedir(os.path.dirname(dest))
shutil.move(src, dest)

def makedir(self, relative_path):
"""Create paths relative to the instance root
def makedir(self, full_path):
"""Create path on local machine

:param str relative_path: the path, without filename, relative to root
:param str full_path: the path, excluding filename
"""
target = os.path.join(self.root, relative_path)
os.makedirs(target, exist_ok=True)
os.makedirs(full_path, exist_ok=True)

@staticmethod
def _fapply(func, pattern):
files = [f for f in glob.glob(pattern) if os.path.isfile(f)]
for f in files:
func(f)

def copy(self, src, dest, recursive=False, update=False):
"""Wrapper around cp command which creates dest path if needed

:param str src: path to original
:param str dest: destination path
:param bool recursive: create directories recursively
:param bool update: ignored
"""
if recursive:
shutil.copytree(src, dest)
else:
self.makedir(dest)
func = lambda s: shutil.copy(s, dest) # noqa: E731
LocalDataAccess._fapply(func, src)

def remove(self, target, recursive=False, confirm=True):
"""Remove target using rm semantics
Expand All @@ -211,32 +263,9 @@ def remove(self, target, recursive=False, confirm=True):
if recursive:
shutil.rmtree(target)
else:
files = [f for f in glob.glob(target) if os.path.isfile(f)]
for f in files:
os.remove(f)
LocalDataAccess._fapply(os.remove, target)
print("--> Done!")

def execute_command(self, command):
"""Execute a command locally at the data access.

:param list command: list of str to be passed to command line.
"""

def wrap(s):
if s is not None:
return s
return open(os.devnull)

proc = Popen(
command,
shell=True,
executable="/bin/bash",
stdout=PIPE,
stderr=PIPE,
text=True,
)
return wrap(None), wrap(proc.stdout), wrap(proc.stderr)

def get_profile_version(self, grid_model, kind):
"""Returns available raw profile from blob storage or local disk

Expand Down Expand Up @@ -264,11 +293,12 @@ class SSHDataAccess(DataAccess):

def __init__(self, root=None):
"""Constructor"""
super().__init__(root)
self._ssh = None
self._retry_after = 5
self.root = server_setup.DATA_ROOT_DIR if root is None else root
self.local_root = server_setup.LOCAL_DIR
self.description = "server"
self.join = posixpath.join

@property
def ssh(self):
Expand Down Expand Up @@ -329,7 +359,7 @@ def copy_from(self, file_name, from_dir=None):
to_dir = os.path.join(self.local_root, from_dir)
os.makedirs(to_dir, exist_ok=True)

from_path = posixpath.join(self.root, from_dir, file_name)
from_path = self.join(self.root, from_dir, file_name)
to_path = os.path.join(to_dir, file_name)
self._check_file_exists(from_path, should_exist=True)

Expand Down Expand Up @@ -360,8 +390,8 @@ def move_to(self, file_name, to_dir=None, change_name_to=None):
)

file_name = file_name if change_name_to is None else change_name_to
to_dir = "" if to_dir is None else to_dir
to_path = posixpath.join(self.root, to_dir, file_name)
to_dir = self.join(self.root, "" if to_dir is None else to_dir)
to_path = self.join(to_dir, file_name)
self.makedir(to_dir)
self._check_file_exists(to_path, should_exist=False)

Expand All @@ -371,14 +401,6 @@ def move_to(self, file_name, to_dir=None, change_name_to=None):

os.remove(from_path)

def execute_command(self, command):
"""Execute a command locally at the data access.

:param list command: list of str to be passed to command line.
:return: (*tuple*) -- stdin, stdout, stderr of executed command.
"""
return self.ssh.exec_command(command)

def execute_command_async(self, command):
"""Execute a command via ssh, without waiting for completion.

Expand All @@ -397,11 +419,11 @@ def checksum(self, relative_path):
:param str relative_path: path relative to root
:return: (*str*) -- the checksum of the file
"""
full_path = posixpath.join(self.root, relative_path)
full_path = self.join(self.root, relative_path)
self._check_file_exists(full_path)

command = f"sha1sum {full_path}"
_, stdout, _ = self.execute_command(command)
_, stdout, _ = self.ssh.exec_command(command)
lines = stdout.readlines()
return lines[0].strip()

Expand All @@ -418,9 +440,9 @@ def push(self, file_name, checksum, change_name_to=None):
self.move_to(file_name, change_name_to=backup)

values = {
"original": posixpath.join(self.root, new_name),
"updated": posixpath.join(self.root, backup),
"lockfile": posixpath.join(self.root, "scenario.lockfile"),
"original": self.join(self.root, new_name),
"updated": self.join(self.root, backup),
"lockfile": self.join(self.root, "scenario.lockfile"),
"checksum": checksum,
}

Expand All @@ -432,24 +454,39 @@ def push(self, file_name, checksum, change_name_to=None):
200>{lockfile}"

command = template.format(**values)
_, _, stderr = self.execute_command(command)
_, _, stderr = self.ssh.exec_command(command)

errors = stderr.readlines()
if len(errors) > 0:
for e in errors:
print(e)
raise IOError("Failed to push file - most likely a conflict was detected.")

def makedir(self, relative_path):
"""Create paths relative to the instance root
def makedir(self, full_path):
"""Create path on server

:param str relative_path: the path, without filename, relative to root
:param str full_path: the path, excluding filename
:raises IOError: if command generated stderr
"""
full_path = posixpath.join(self.root, relative_path)
_, _, stderr = self.execute_command(f"mkdir -p {full_path}")
_, _, stderr = self.ssh.exec_command(f"mkdir -p {full_path}")
errors = stderr.readlines()
if len(errors) > 0:
raise IOError(f"Failed to create {full_path} on server")

def copy(self, src, dest, recursive=False, update=False):
"""Wrapper around cp command which creates dest path if needed

:param str src: path to original
:param str dest: destination path
:param bool recursive: create directories recursively
:param bool update: only copy if needed
:raises IOError: if command generated stderr
"""
self.makedir(dest)
command = CommandBuilder.copy(src, dest, recursive, update)
_, _, stderr = self.ssh.exec_command(command)
if len(stderr.readlines()) != 0:
raise IOError("Failed to create %s on server" % full_path)
raise IOError(f"Failed to execute {command}")

def remove(self, target, recursive=False, confirm=True):
"""Run rm command on server
Expand All @@ -465,7 +502,7 @@ def remove(self, target, recursive=False, confirm=True):
if confirmed.lower() != "y":
print("Operation cancelled.")
return
_, _, stderr = self.execute_command(command)
_, _, stderr = self.ssh.exec_command(command)
if len(stderr.readlines()) != 0:
raise IOError(f"Failed to delete target={target} on server")
print("--> Done!")
Expand All @@ -476,12 +513,13 @@ def _exists(self, filepath):
:param str filepath: the path to the file
:return: (*bool*) -- whether the file exists
"""
_, _, stderr = self.execute_command(f"ls {filepath}")
_, _, stderr = self.ssh.exec_command(f"ls {filepath}")
return len(stderr.readlines()) == 0

def close(self):
"""Close the connection that was opened when the object was created."""
self.ssh.close()
"""Close the connection if one is open"""
if self._ssh is not None:
self._ssh.close()


def progress_bar(*args, **kwargs):
Expand Down
11 changes: 5 additions & 6 deletions powersimdata/data_access/profile_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,25 @@ def get_file_components(scenario_info, field_name):

:param dict scenario_info: a ScenarioInfo instance
:param str field_name: the kind of profile
:return: (*tuple*) -- file name and path
:return: (*tuple*) -- file name and list of path components
"""
version = scenario_info["base_" + field_name]
file_name = field_name + "_" + version + ".csv"
grid_model = scenario_info["grid_model"]
from_dir = os.path.join("raw", grid_model)
return file_name, from_dir
return file_name, ("raw", grid_model)

@staticmethod
def download_file(file_name, from_dir):
"""Download the profile from blob storage at the given path

:param str file_name: profile csv
:param str from_dir: the path relative to the blob container
:param tuple from_dir: tuple of path components
:return: (*str*) -- path to downloaded file
"""
print(f"--> Downloading {file_name} from blob storage.")
url_path = "/".join(os.path.split(from_dir))
url_path = "/".join(from_dir)
url = f"{ProfileHelper.BASE_URL}/{url_path}/{file_name}"
dest = os.path.join(server_setup.LOCAL_DIR, from_dir, file_name)
dest = os.path.join(server_setup.LOCAL_DIR, *from_dir, file_name)
os.makedirs(os.path.dirname(dest), exist_ok=True)
resp = requests.get(url, stream=True)
content_length = int(resp.headers.get("content-length", 0))
Expand Down
Loading