diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 516bc3d5d..0c15a4dc7 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -19,7 +19,7 @@ jobs: python-version: 3.9 - run: python -m pip install --upgrade pip tox - - run: tox -e pytest-local -- --cov-report=xml + - run: tox -e pytest-local -- --cov=powersimdata --cov-report=xml - name: Upload coverage to Codecov uses: codecov/codecov-action@v1 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7f25c83c1..86d092f68 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -25,4 +25,4 @@ jobs: python-version: ${{ matrix.python-version }} - run: python -m pip install --upgrade pip tox - - run: tox -e pytest-local + - run: tox -e pytest-local -- --cov=powersimdata diff --git a/Dockerfile b/Dockerfile index a37ff7024..04b006451 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,6 @@ FROM python:3.8.3 RUN apt-get update -RUN apt-get install gawk RUN ln -s /mnt/bes/pcm $HOME/ScenarioData COPY powersimdata/utility/templates /mnt/bes/pcm/ diff --git a/powersimdata/data_access/csv_store.py b/powersimdata/data_access/csv_store.py index 62e753911..4343ae90a 100644 --- a/powersimdata/data_access/csv_store.py +++ b/powersimdata/data_access/csv_store.py @@ -1,13 +1,7 @@ import functools -import os -import shutil -from pathlib import Path -from tempfile import mkstemp import pandas as pd -from powersimdata.utility import server_setup - def verify_hash(func): """Utility function which verifies the sha1sum of the file before writing @@ -25,6 +19,19 @@ def wrapper(self, *args, **kwargs): return wrapper +def _parse_csv(file_object): + """Read file from disk into data frame + + :param str, path object or file-like object file_object: a reference to + the csv file + :return: (*pandas.DataFrame*) -- the specified file as a data frame. + """ + table = pd.read_csv(file_object) + table.set_index("id", inplace=True) + table.fillna("", inplace=True) + return table.astype(str) + + class CsvStore: """Base class for common functionality used to manage scenario and execute list stored as csv files on the server @@ -43,30 +50,15 @@ def get_table(self): :return: (*pandas.DataFrame*) -- the specified table as a data frame. """ filename = self._FILE_NAME - local_path = Path(server_setup.LOCAL_DIR, filename) - try: - self.data_access.copy_from(filename) + return self._get_table(filename) except: # noqa - print(f"Failed to download {filename} from server") - print("Falling back to local cache...") - - if local_path.is_file(): - return self._parse_csv(local_path) - else: - raise FileNotFoundError(f"{filename} does not exist locally.") + return self._get_table(filename + ".2") - def _parse_csv(self, file_object): - """Read file from disk into data frame - - :param str, path object or file-like object file_object: a reference to - the csv file - :return: (*pandas.DataFrame*) -- the specified file as a data frame. - """ - table = pd.read_csv(file_object) - table.set_index("id", inplace=True) - table.fillna("", inplace=True) - return table.astype(str) + def _get_table(self, filename): + self.data_access.copy_from(filename) + with self.data_access.get(filename) as (f, _): + return _parse_csv(f) def commit(self, table, checksum): """Save to local directory and upload if needed @@ -74,11 +66,5 @@ def commit(self, table, checksum): :param pandas.DataFrame table: the data frame to save :param str checksum: the checksum prior to download """ - tmp_file, tmp_path = mkstemp(dir=server_setup.LOCAL_DIR) - table.to_csv(tmp_path) - shutil.copy(tmp_path, os.path.join(server_setup.LOCAL_DIR, self._FILE_NAME)) - os.close(tmp_file) - tmp_name = os.path.basename(tmp_path) - self.data_access.push(tmp_name, checksum, change_name_to=self._FILE_NAME) - if os.path.exists(tmp_path): # only required if data_access is LocalDataAccess - os.remove(tmp_path) + with self.data_access.push(self._FILE_NAME, checksum) as f: + table.to_csv(f) diff --git a/powersimdata/data_access/data_access.py b/powersimdata/data_access/data_access.py index a2e891587..59e1afe7c 100644 --- a/powersimdata/data_access/data_access.py +++ b/powersimdata/data_access/data_access.py @@ -1,8 +1,12 @@ import posixpath -import time +from contextlib import contextmanager from subprocess import Popen -import fs as fs2 +import fs +from fs import errors +from fs.glob import Globber +from fs.multifs import MultiFS +from fs.path import basename, dirname from fs.tempfs import TempFS from powersimdata.data_access.profile_helper import ( @@ -13,38 +17,110 @@ from powersimdata.utility import server_setup +def get_blob_fs(container): + account = "besciences" + return fs.open_fs(f"azblob://{account}@{container}") + + def get_ssh_fs(root=""): host = server_setup.SERVER_ADDRESS port = server_setup.SERVER_SSH_PORT username = server_setup.get_server_user() - base_fs = fs2.open_fs(f"ssh://{username}@{host}:{port}") + base_fs = fs.open_fs(f"ssh://{username}@{host}:{port}") return WrapSSHFS(base_fs, root) +def get_multi_fs(root): + """Create filesystem combining the server (if connected) with profile and scenario + containers in blob storage. The priority is in descending order, so the server will + be used first if possible + """ + scenario_data = get_blob_fs("scenariodata") + profiles = get_blob_fs("profiles") + mfs = MultiFS() + try: + ssh_fs = get_ssh_fs(root) + mfs.add_fs("ssh_fs", ssh_fs, write=True, priority=3) + except: # noqa + print("Could not connect to ssh server") + mfs.add_fs("profile_fs", profiles, priority=2) + mfs.add_fs("scenario_fs", scenario_data, priority=1) + remotes = ",".join([f[0] for f in mfs.iterate_fs()]) + print(f"Initialized remote filesystem with {remotes}") + return mfs + + class DataAccess: """Interface to a local or remote data store.""" def __init__(self, root): """Constructor""" self.root = root - self.join = fs2.path.join + self.join = fs.path.join + self.local_fs = None - def copy_from(self, file_name, from_dir): - """Copy a file from data store to userspace. + @contextmanager + def get(self, filepath): + """Copy file from remote filesystem if needed and read into memory - :param str file_name: file name to copy. - :param str from_dir: data store directory to copy file from. + :param str filepath: path to file + :return: (*tuple*) -- file object and filepath to be handled by caller """ - raise NotImplementedError + if not self.local_fs.exists(filepath): + print(f"{filepath} not found on local machine") + from_dir, filename = dirname(filepath), basename(filepath) + self.copy_from(filename, from_dir) + + with self.local_fs.openbin(filepath) as f: + filepath = self.local_fs.getsyspath(filepath) + yield f, filepath + + @contextmanager + def write(self, filepath, save_local=True): + """Write a file to data store. + + :param str filepath: path to save data to + :param bool save_local: whether a copy should also be saved to the local filesystem, if + such a filesystem is configured. Defaults to True. + """ + self._check_file_exists(filepath, should_exist=False) + + print("Writing %s" % filepath) + with fs.open_fs("mem://") as mem_fs: + mem_fs.makedirs(dirname(filepath), recreate=True) + with mem_fs.open(filepath, "wb") as f: + yield f + self._copy(mem_fs, self.fs, filepath) + if save_local: + self._copy(mem_fs, self.local_fs, filepath) + + def _copy(self, src_fs, dst_fs, filepath): + """Copy file from one filesystem to another. + + :param fs.base.FS src_fs: source filesystem + :param fs.base.FS dst_fs: destination filesystem + :param str filepath: path to file + """ + dst_fs.makedirs(dirname(filepath), recreate=True) + fs.copy.copy_file(src_fs, filepath, dst_fs, filepath) - def move_to(self, file_name, to_dir, change_name_to=None): - """Copy a file from userspace to data store. + def copy_from(self, file_name, from_dir=None): + """Copy a file from data store to userspace. :param str file_name: file name to copy. - :param str to_dir: data store directory to copy file to. - :param str change_name_to: new name for file when copied to data store. + :param str from_dir: data store directory to copy file from. """ - raise NotImplementedError + from_dir = "" if from_dir is None else from_dir + from_path = self.join(from_dir, file_name) + self._check_file_exists(from_path, should_exist=True) + + location, _ = self.fs.which(from_path) + print(f"Transferring {file_name} from {location}") + with TempFS() as tmp_fs: + self.local_fs.makedirs(from_dir, recreate=True) + tmp_fs.makedirs(from_dir, recreate=True) + fs.copy.copy_file(self.fs, from_path, tmp_fs, from_path) + fs.move.move_file(tmp_fs, from_path, self.local_fs, from_path) def tmp_folder(self, scenario_id): """Get path to temporary scenario folder @@ -60,24 +136,33 @@ def copy(self, src, dest): :param str src: path to file :param str dest: destination folder """ - if self.fs.exists(dest) and self.fs.isdir(dest): - dest = self.join(dest, fs2.path.basename(src)) + if self.fs.isdir(dest): + dest = self.join(dest, fs.path.basename(src)) self.fs.copy(src, dest) - def remove(self, pattern, confirm=True): + def remove(self, base_dir, pattern, confirm=True): """Delete files in current environment + :param str base_dir: root within which to search :param str pattern: glob specifying files to remove :param bool confirm: prompt before executing command + :return: (*bool*) -- True if the operation is completed """ if confirm: - confirmed = input(f"Delete '{pattern}'? [y/n] (default is 'n')") + target = self.join(base_dir, pattern) + confirmed = input(f"Delete '{target}'? [y/n] (default is 'n')") if confirmed.lower() != "y": print("Operation cancelled.") - return - self.fs.glob(pattern).remove() + return False + + for _fs in (self.fs.write_fs, self.local_fs): + try: + Globber(_fs.opendir(base_dir), pattern).remove() + except errors.ResourceNotFound: + print(f"Skipping {base_dir} not found on {_fs}") print("--> Done!") + return True def _check_file_exists(self, path, should_exist=True): """Check that file exists (or not) at the given path @@ -86,25 +171,24 @@ def _check_file_exists(self, path, should_exist=True): :param bool should_exist: whether the file is expected to exist :raises OSError: if the expected condition is not met """ - exists = self.fs.exists(path) + location, _ = self.fs.which(path) + exists = location is not None if should_exist and not exists: - raise OSError(f"{path} not found on {self.description}") + remotes = [f[0] for f in self.fs.iterate_fs()] + raise OSError(f"{path} not found on any of {remotes}") if not should_exist and exists: - raise OSError(f"{path} already exists on {self.description}") - - def makedir(self, path): - """Create path in current environment + raise OSError(f"{path} already exists on {location}") - :param str path: the relative path, excluding filename - """ - self.fs.makedirs(path, recreate=True) - - def execute_command_async(self, command): - """Execute a command locally at the DataAccess, without waiting for completion. + def get_profile_version(self, grid_model, kind): + """Returns available raw profile from blob storage - :param list command: list of str to be passed to command line. + :param str grid_model: grid model. + :param str kind: *'demand'*, *'hydro'*, *'solar'* or *'wind'*. + :return: (*list*) -- available profile version. """ - raise NotImplementedError + blob_version = get_profile_version_cloud(grid_model, kind) + local_version = get_profile_version_local(grid_model, kind) + return list(set(blob_version + local_version)) def checksum(self, relative_path): """Return the checksum of the file path @@ -112,145 +196,67 @@ def checksum(self, relative_path): :param str relative_path: path relative to root :return: (*str*) -- the checksum of the file """ - return "dummy_value" + return self.fs.hash(relative_path, "sha256") - def push(self, file_name, checksum, change_name_to=None): + def push(self, file_name, checksum): """Push the file from local to remote root folder, ensuring integrity :param str file_name: the file name, located at the local root :param str checksum: the checksum prior to download - :param str change_name_to: new name for file when copied to data store. """ raise NotImplementedError - def get_profile_version(self, grid_model, kind): - """Returns available raw profile from blob storage - - :param str grid_model: grid model. - :param str kind: *'demand'*, *'hydro'*, *'solar'* or *'wind'*. - :return: (*list*) -- available profile version. - """ - blob_version = get_profile_version_cloud(grid_model, kind) - local_version = get_profile_version_local(grid_model, kind) - return list(set(blob_version + local_version)) - class LocalDataAccess(DataAccess): """Interface to shared data volume""" def __init__(self, root=server_setup.LOCAL_DIR): super().__init__(root) - self.description = "local machine" - self.fs = fs2.open_fs(root) + self.local_fs = fs.open_fs(root) + self.fs = self._get_fs() - def copy_from(self, file_name, from_dir=None): - """Copy a file from data store to userspace. + def _get_fs(self): + mfs = MultiFS() + profiles = get_blob_fs("profiles") + mfs.add_fs("profile_fs", profiles, priority=2) + mfs.add_fs("local_fs", self.local_fs, write=True, priority=3) + return mfs - :param str file_name: file name to copy. - :param str from_dir: data store directory to copy file from. - """ - pass - - def push(self, file_name, checksum, change_name_to=None): - """Nothing to be done due to symlink + @contextmanager + def push(self, file_name, checksum): + """Write file if checksum matches :param str file_name: the file name, located at the local root :param str checksum: the checksum prior to download - :param str change_name_to: new name for file when copied to data store. """ - pass - - def move_to(self, file_name, to_dir, change_name_to=None): - """Copy a file from userspace to data store. - - :param str file_name: file name to copy. - :param str to_dir: data store directory to copy file to. - :param str change_name_to: new name for file when copied to data store. - """ - change_name_to = file_name if change_name_to is None else change_name_to - dest = self.join(to_dir, change_name_to) - print(f"--> Moving file {file_name} to {to_dir}") - self._check_file_exists(dest, should_exist=False) - self.makedir(to_dir) - self.fs.move(file_name, dest) + if checksum != self.checksum(file_name): + raise ValueError("Checksums do not match") + with fs.open_fs("temp://") as tfs: + with tfs.openbin(file_name, "w") as f: + yield f + fs.move.move_file(tfs, file_name, self.local_fs, file_name) class SSHDataAccess(DataAccess): """Interface to a remote data store, accessed via SSH.""" - _last_attempt = 0 - def __init__(self, root=server_setup.DATA_ROOT_DIR): """Constructor""" super().__init__(root) self._fs = None - self._retry_after = 5 - self.local_root = server_setup.LOCAL_DIR - self.local_fs = fs2.open_fs(self.local_root) - self.description = "server" + self.local_fs = fs.open_fs(server_setup.LOCAL_DIR) @property def fs(self): - """Get or create the filesystem object, with attempts rate limited. + """Get or create the filesystem object :raises IOError: if connection failed or still within retry window - :return: (*powersimdata.data_access.ssh_fs.WrapSSHFS) -- filesystem instance + :return: (*fs.multifs.MultiFS*) -- filesystem instance """ if self._fs is None: - should_attempt = ( - time.time() - SSHDataAccess._last_attempt > self._retry_after - ) - if should_attempt: - try: - self._fs = get_ssh_fs(self.root) - return self._fs - except: # noqa - SSHDataAccess._last_attempt = time.time() - msg = f"Could not connect to server, will try again after {self._retry_after} seconds" - raise IOError(msg) - + self._fs = get_multi_fs(self.root) return self._fs - def copy_from(self, file_name, from_dir=None): - """Copy a file from data store to userspace. - - :param str file_name: file name to copy. - :param str from_dir: data store directory to copy file from. - """ - from_dir = "" if from_dir is None else from_dir - from_path = self.join(from_dir, file_name) - self._check_file_exists(from_path, should_exist=True) - - print(f"Transferring {file_name} from server") - with TempFS() as tmp_fs: - self.local_fs.makedirs(from_dir, recreate=True) - tmp_fs.makedirs(from_dir, recreate=True) - fs2.copy.copy_file(self.fs, from_path, tmp_fs, from_path) - fs2.move.move_file(tmp_fs, from_path, self.local_fs, from_path) - - def move_to(self, file_name, to_dir=None, change_name_to=None): - """Copy a file from userspace to data store. - - :param str file_name: file name to copy. - :param str to_dir: data store directory to copy file to. - :param str change_name_to: new name for file when copied to data store. - :raises FileNotFoundError: if specified file does not exist - """ - if not self.local_fs.isfile(file_name): - raise FileNotFoundError( - f"{file_name} not found in {self.local_root} on local machine" - ) - - change_name_to = file_name if change_name_to is None else change_name_to - to_dir = "" if to_dir is None else to_dir - self.makedir(to_dir) - - to_path = self.join(to_dir, change_name_to) - self._check_file_exists(to_path, should_exist=False) - - print(f"Transferring {change_name_to} to server") - fs2.move.move_file(self.local_fs, file_name, self.fs, to_path) - def execute_command_async(self, command): """Execute a command via ssh, without waiting for completion. @@ -269,25 +275,31 @@ def checksum(self, relative_path): :param str relative_path: path relative to root :return: (*str*) -- the checksum of the file """ - full_path = self.join(self.root, relative_path) self._check_file_exists(relative_path) + full_path = self.join(self.root, relative_path) + ssh_fs = self.fs.get_fs("ssh_fs") + return ssh_fs.checksum(full_path) - return self.fs.checksum(full_path) - - def push(self, file_name, checksum, change_name_to=None): + @contextmanager + def push(self, file_name, checksum): """Push file to server and verify the checksum matches a prior value :param str file_name: the file name, located at the local root :param str checksum: the checksum prior to download - :param str change_name_to: new name for file when copied to data store. :raises IOError: if command generated stderr """ - new_name = file_name if change_name_to is None else change_name_to - backup = f"{new_name}.temp" - self.move_to(file_name, change_name_to=backup) + backup = f"{file_name}.temp" + + self._check_file_exists(backup, should_exist=False) + print(f"Transferring {file_name} to server") + with fs.open_fs("temp://") as tfs: + with tfs.openbin(file_name, "w") as f: + yield f + fs.move.move_file(tfs, file_name, self.local_fs, file_name) + fs.copy.copy_file(self.local_fs, file_name, self.fs, backup) values = { - "original": posixpath.join(self.root, new_name), + "original": posixpath.join(self.root, file_name), "updated": posixpath.join(self.root, backup), "lockfile": posixpath.join(self.root, "scenario.lockfile"), "checksum": checksum, @@ -301,7 +313,8 @@ def push(self, file_name, checksum, change_name_to=None): 200>{lockfile}" command = template.format(**values) - _, _, stderr = self.fs.exec_command(command) + ssh_fs = self.fs.get_fs("ssh_fs") + _, _, stderr = ssh_fs.exec_command(command) errors = stderr.readlines() if len(errors) > 0: @@ -310,22 +323,50 @@ def push(self, file_name, checksum, change_name_to=None): raise IOError("Failed to push file - most likely a conflict was detected.") -class MemoryDataAccess(SSHDataAccess): - """Mimic a client server architecture using in memory filesystems""" +class _DataAccessTemplate(SSHDataAccess): + """Template for data access object using temp or in memory filesystems""" - def __init__(self): - self.local_fs = fs2.open_fs("mem://") - self._fs = fs2.open_fs("mem://") - self.description = "in-memory" - self.local_root = self.root = "dummy" - self.join = fs2.path.join + def __init__(self, fs_url): + self.local_fs = fs.open_fs(fs_url) + self._fs = self._get_fs(fs_url) + self.root = "foo" + self.join = fs.path.join + + def _get_fs(self, fs_url): + mfs = MultiFS() + mfs.add_fs("remotefs", fs.open_fs(fs_url), write=True, priority=3) + return mfs + + def checksum(self, relative_path): + """Return the checksum of the file path - def push(self, file_name, checksum, change_name_to=None): + :param str relative_path: path relative to root + :return: (*str*) -- the checksum of the file + """ + return self.fs.hash(relative_path, "sha256") + + @contextmanager + def push(self, file_name, checksum): """Push file from local to remote filesystem, bypassing checksum since this is in memory. :param str file_name: the file name, located at the local root :param str checksum: the checksum prior to download - :param str change_name_to: new name for file when copied to data store. """ - self.move_to(file_name, change_name_to=change_name_to) + with self.local_fs.openbin(file_name, "w") as f: + yield f + fs.move.move_file(self.local_fs, file_name, self.fs, file_name) + + +class TempDataAccess(_DataAccessTemplate): + """Mimic a client server architecture using temp filesystems""" + + def __init__(self): + super().__init__("temp://") + + +class MemoryDataAccess(_DataAccessTemplate): + """Mimic a client server architecture using in memory filesystems""" + + def __init__(self): + super().__init__("mem://") diff --git a/powersimdata/data_access/profile_helper.py b/powersimdata/data_access/profile_helper.py index 7a73e60af..f23071dcb 100644 --- a/powersimdata/data_access/profile_helper.py +++ b/powersimdata/data_access/profile_helper.py @@ -1,8 +1,4 @@ -import os - import fs -import requests -from tqdm.auto import tqdm from powersimdata.utility import server_setup @@ -36,7 +32,7 @@ def get_profile_version_local(grid_model, kind): :return: (*list*) -- available profile version. """ profile_dir = fs.path.join(server_setup.LOCAL_DIR, "raw", grid_model) - lfs = fs.open_fs(profile_dir) + lfs = fs.open_fs(profile_dir, create=True) return _get_profile_version(lfs, kind) @@ -56,32 +52,3 @@ def get_file_components(scenario_info, field_name): file_name = field_name + "_" + version + ".csv" grid_model = scenario_info["grid_model"] return file_name, ("raw", grid_model) - - @staticmethod - def download_file(file_name, from_dir): - """Download the profile from blob storage at the given path. - - :param str file_name: profile csv. - :param tuple from_dir: tuple of path components. - :return: (*str*) -- path to downloaded file. - """ - print(f"--> Downloading {file_name} from blob storage.") - url_path = "/".join(from_dir) - url = f"{ProfileHelper.BASE_URL}/{url_path}/{file_name}" - dest = os.path.join(server_setup.LOCAL_DIR, *from_dir, file_name) - os.makedirs(os.path.dirname(dest), exist_ok=True) - resp = requests.get(url, stream=True) - content_length = int(resp.headers.get("content-length", 0)) - with open(dest, "wb") as f: - with tqdm( - unit="B", - unit_scale=True, - unit_divisor=1024, - miniters=1, - total=content_length, - ) as pbar: - for chunk in resp.iter_content(chunk_size=4096): - f.write(chunk) - pbar.update(len(chunk)) - - return dest diff --git a/powersimdata/data_access/tests/test_data_access.py b/powersimdata/data_access/tests/test_data_access.py index 4106f5cd6..955b30be2 100644 --- a/powersimdata/data_access/tests/test_data_access.py +++ b/powersimdata/data_access/tests/test_data_access.py @@ -58,25 +58,3 @@ def test_copy_from_multi_path(data_access): make_temp(data_access.fs, filepath) data_access.copy_from(FILE_NAME, src_path) _check_content(data_access.local_fs, filepath) - - -def test_move_to(data_access): - make_temp(data_access.local_fs, FILE_NAME) - data_access.move_to(FILE_NAME) - _check_content(data_access.fs, FILE_NAME) - - -def test_move_to_multi_path(data_access): - rel_path = _join(data_access.local_root, "foo", "bar") - filepath = _join(rel_path, FILE_NAME) - make_temp(data_access.local_fs, FILE_NAME) - data_access.move_to(FILE_NAME, rel_path) - _check_content(data_access.fs, filepath) - - -def test_move_to_rename(data_access): - make_temp(data_access.local_fs, FILE_NAME) - - new_fname = "foo.txt" - data_access.move_to(FILE_NAME, change_name_to=new_fname) - _check_content(data_access.fs, new_fname) diff --git a/powersimdata/input/change_table.py b/powersimdata/input/change_table.py index e1e2c9bd8..404c8f4f2 100644 --- a/powersimdata/input/change_table.py +++ b/powersimdata/input/change_table.py @@ -1,6 +1,4 @@ import copy -import os -import pickle from itertools import chain from powersimdata.design.transmission.upgrade import ( @@ -8,7 +6,6 @@ scale_renewable_stubs, ) from powersimdata.input.transform_grid import TransformGrid -from powersimdata.utility import server_setup from powersimdata.utility.distance import find_closest_neighbor _resources = ( @@ -956,16 +953,3 @@ def _check_for_islanded_load_buses(self): diff = load_buses - connected_buses if len(diff) > 0: print(f"Warning: load buses connected to no lines exist: {sorted(diff)}") - - def write(self, scenario_id): - """Saves change table to disk. - - :param str scenario_id: scenario index. - :raises IOError: if file already exists on local machine. - """ - file_name = os.path.join(server_setup.LOCAL_DIR, scenario_id + "_ct.pkl") - if os.path.isfile(file_name) is False: - print("Writing %s" % file_name) - pickle.dump(self.ct, open(file_name, "wb")) - else: - raise IOError("%s already exists" % file_name) diff --git a/powersimdata/input/export_data.py b/powersimdata/input/export_data.py index 5222f4bb5..d1b67c4c2 100644 --- a/powersimdata/input/export_data.py +++ b/powersimdata/input/export_data.py @@ -6,12 +6,16 @@ from powersimdata.input.transform_profile import TransformProfile -def export_case_mat(grid, filepath, storage_filepath=None): +def export_case_mat(grid, filepath=None, storage_filepath=None): """Export a grid to a format suitable for loading into simulation engine. + If optional filepath arguments are used, the results will also be saved to + the filepaths provided :param powersimdata.input.grid.Grid grid: Grid instance. - :param str filepath: path where main grid file will be saved. + :param str filepath: path where main grid file will be saved, if present :param str storage_filepath: path where storage data file will be saved, if present. + :return: (*tuple*) -- the mpc data as a dictionary and the mpc storage data + as a dictionary, if present. The storage data will be None if not present. """ grid = copy.deepcopy(grid) @@ -105,6 +109,8 @@ def export_case_mat(grid, filepath, storage_filepath=None): mpc["mpc"]["dclineid"] = dclineid # energy storage + mpc_storage = None + if len(grid.storage["gen"]) > 0: storage = grid.storage.copy() @@ -119,9 +125,12 @@ def export_case_mat(grid, filepath, storage_filepath=None): } } - savemat(storage_filepath, mpc_storage, appendmat=False) + if filepath is not None: + savemat(filepath, mpc, appendmat=False) + if mpc_storage is not None: + savemat(storage_filepath, mpc_storage, appendmat=False) - savemat(filepath, mpc, appendmat=False) + return mpc, mpc_storage def export_transformed_profile(kind, scenario_info, grid, ct, filepath, slice=True): diff --git a/powersimdata/input/input_data.py b/powersimdata/input/input_data.py index 8a004a5cc..00dc93bca 100644 --- a/powersimdata/input/input_data.py +++ b/powersimdata/input/input_data.py @@ -1,4 +1,5 @@ import os +import pickle import pandas as pd @@ -19,9 +20,6 @@ class InputHelper: - def __init__(self, data_access): - self.data_access = data_access - @staticmethod def get_file_components(scenario_info, field_name): """Get the file name and relative path for either ct or grid. @@ -34,15 +32,6 @@ def get_file_components(scenario_info, field_name): file_name = scenario_info["id"] + "_" + field_name + "." + ext return file_name, server_setup.INPUT_DIR - def download_file(self, file_name, from_dir): - """Download the file if using server, otherwise no-op. - - :param str file_name: either grid or ct file name. - :param tuple from_dir: tuple of path components. - """ - from_dir = self.data_access.join(*from_dir) - self.data_access.copy_from(file_name, from_dir) - def _check_field(field_name): """Checks field name. @@ -57,6 +46,29 @@ def _check_field(field_name): raise ValueError("Only %s data can be loaded" % " | ".join(possible)) +def _read(f, filepath): + """Read data from file object + + :param io.IOBase f: a file handle + :param str filepath: the filepath corresponding to f + :raises ValueError: if extension is unknown. + :return: object -- the result + """ + ext = os.path.basename(filepath).split(".")[-1] + if ext == "pkl": + data = pd.read_pickle(f) + elif ext == "csv": + data = pd.read_csv(f, index_col=0, parse_dates=True) + data.columns = data.columns.astype(int) + elif ext == "mat": + # get fully qualified local path to matfile + data = os.path.abspath(filepath) + else: + raise ValueError("Unknown extension! %s" % ext) + + return data + + class InputData: """Load input data. @@ -84,24 +96,17 @@ def get_data(self, scenario_info, field_name): if field_name in profile_kind: helper = ProfileHelper else: - helper = InputHelper(self.data_access) + helper = InputHelper file_name, from_dir = helper.get_file_components(scenario_info, field_name) - filepath = os.path.join(server_setup.LOCAL_DIR, *from_dir, file_name) + filepath = "/".join([*from_dir, file_name]) key = cache_key(filepath) cached = _cache.get(key) if cached is not None: return cached - try: - data = _read_data(filepath) - except FileNotFoundError: - print( - "%s not found in %s on local machine" - % (file_name, server_setup.LOCAL_DIR) - ) - helper.download_file(file_name, from_dir) - data = _read_data(filepath) + with self.data_access.get(filepath) as (f, path): + data = _read(f, path) _cache.put(key, data) return data @@ -114,30 +119,15 @@ def get_profile_version(self, grid_model, kind): """ return self.data_access.get_profile_version(grid_model, kind) + def save_change_table(self, ct, scenario_id): + """Saves change table to the data store. -def _read_data(filepath): - """Reads data from local machine. - - :param str filepath: path to file, with extension either 'pkl', 'csv', or 'mat'. - :return: (*pandas.DataFrame*, *dict*, or *str*) -- demand, hydro, solar or - wind as a data frame, change table as a dict, or str containing a - local path to a matfile of grid data. - :raises ValueError: if extension is unknown. - """ - ext = os.path.basename(filepath).split(".")[-1] - if ext == "pkl": - data = pd.read_pickle(filepath) - elif ext == "csv": - data = pd.read_csv(filepath, index_col=0, parse_dates=True) - data.columns = data.columns.astype(int) - elif ext == "mat": - # Try to load the matfile, just to check if it exists locally - open(filepath, "r") - data = filepath - else: - raise ValueError("Unknown extension! %s" % ext) - - return data + :param dict ct: a change table + :param str scenario_id: scenario id, used for file name + """ + filepath = "/".join([*server_setup.INPUT_DIR, f"{scenario_id}_ct.pkl"]) + with self.data_access.write(filepath) as f: + pickle.dump(ct, f) def distribute_demand_from_zones_to_buses(zone_demand, bus): diff --git a/powersimdata/output/output_data.py b/powersimdata/output/output_data.py index af7a5cabd..e740faa89 100644 --- a/powersimdata/output/output_data.py +++ b/powersimdata/output/output_data.py @@ -1,6 +1,3 @@ -import os -import pickle - import numpy as np import pandas as pd from scipy.sparse import coo_matrix @@ -35,20 +32,9 @@ def get_data(self, scenario_id, field_name): print("--> Loading %s" % field_name) file_name = scenario_id + "_" + field_name + ".pkl" - from_dir = server_setup.OUTPUT_DIR - filepath = os.path.join(server_setup.LOCAL_DIR, *from_dir, file_name) - - try: - return pd.read_pickle(filepath) - except pickle.UnpicklingError: - err_msg = f"Unable to unpickle {file_name}, possibly corrupted in download." - raise ValueError(err_msg) - except FileNotFoundError: - print(f"{filepath} not found on local machine") - - remote_dir = self._data_access.join(*from_dir) - self._data_access.copy_from(file_name, remote_dir) - return pd.read_pickle(filepath) + filepath = "/".join([*server_setup.OUTPUT_DIR, file_name]) + with self._data_access.get(filepath) as (f, _): + return pd.read_pickle(f) def _check_field(field_name): diff --git a/powersimdata/scenario/analyze.py b/powersimdata/scenario/analyze.py index eb6356b85..c2f51b82b 100644 --- a/powersimdata/scenario/analyze.py +++ b/powersimdata/scenario/analyze.py @@ -18,7 +18,7 @@ class Analyze(Ready): """ name = "analyze" - allowed = [] + allowed = ["delete"] exported_methods = { "get_averaged_cong", "get_congl", @@ -53,7 +53,7 @@ def refresh(self, scenario): def _set_allowed_state(self): """Sets allowed state.""" if self._scenario_status == "extracted": - self.allowed = ["delete", "move"] + self.allowed.append("move") def _set_ct_and_grid(self): """Sets change table and grid.""" diff --git a/powersimdata/scenario/create.py b/powersimdata/scenario/create.py index f89f1b6d8..eca53c250 100644 --- a/powersimdata/scenario/create.py +++ b/powersimdata/scenario/create.py @@ -1,5 +1,4 @@ import copy -import pickle import warnings import numpy as np @@ -16,7 +15,6 @@ from powersimdata.network.model import ModelImmutables from powersimdata.scenario.execute import Execute from powersimdata.scenario.state import State -from powersimdata.utility import server_setup class Create(State): @@ -77,11 +75,9 @@ def _update_scenario_info(self): def _upload_change_table(self): """Uploads change table to server.""" - print("--> Writing change table on local machine") - self.builder.change_table.write(self._scenario_info["id"]) - file_name = self._scenario_info["id"] + "_ct.pkl" - input_dir = self._data_access.join(*server_setup.INPUT_DIR) - self._data_access.move_to(file_name, input_dir) + InputData().save_change_table( + self.builder.change_table.ct, self._scenario_info["id"] + ) def get_bus_demand(self): """Returns demand profiles, by bus. @@ -359,18 +355,6 @@ def set_engine(self, engine): else: self.engine = engine - def load_change_table(self, filename): - """Uploads change table. - - :param str filename: full path to change table pickle file. - :raises FileNotFoundError: if file not found. - """ - try: - ct = pickle.load(open(filename, "rb")) - self.change_table.ct = ct - except FileNotFoundError: - raise ("%s not found. " % filename) - def get_grid(self): """Returns a transformed grid. diff --git a/powersimdata/scenario/delete.py b/powersimdata/scenario/delete.py index 7df554cd8..1c14c5e79 100644 --- a/powersimdata/scenario/delete.py +++ b/powersimdata/scenario/delete.py @@ -1,4 +1,3 @@ -from powersimdata.data_access.data_access import LocalDataAccess from powersimdata.scenario.ready import Ready from powersimdata.utility import server_setup @@ -15,29 +14,33 @@ def delete_scenario(self, confirm=True): :param bool confirm: prompt before each batch """ - # Delete entry in scenario list scenario_id = self._scenario_info["id"] + _join = self._data_access.join + + input_dir = _join(*server_setup.INPUT_DIR) + output_dir = _join(*server_setup.OUTPUT_DIR) + + proceed = self._data_access.remove( + input_dir, f"{scenario_id}_*", confirm=confirm + ) + if proceed: + proceed = self._data_access.remove( + output_dir, f"{scenario_id}_*", confirm=confirm + ) + if proceed: + pattern = f"scenario_{scenario_id}/*" + proceed = self._data_access.remove( + server_setup.EXECUTE_DIR, pattern, confirm=confirm + ) + + if not proceed: + print("Cancelling deletion.") + return + + print("--> Deleting entries in scenario and execute list") self._scenario_list_manager.delete_entry(scenario_id) self._execute_list_manager.delete_entry(scenario_id) - print("--> Deleting scenario input data") - target = self._data_access.join(*server_setup.INPUT_DIR, f"{scenario_id}_*") - self._data_access.remove(target, confirm=confirm) - - print("--> Deleting scenario output data") - target = self._data_access.join(*server_setup.OUTPUT_DIR, f"{scenario_id}_*") - self._data_access.remove(target, confirm=confirm) - - # Delete temporary folder enclosing simulation inputs - print("--> Deleting temporary folder") - tmp_dir = self._data_access.tmp_folder(scenario_id) - self._data_access.remove(f"{tmp_dir}/**", confirm=confirm) - - print("--> Deleting input and output data on local machine") - local_data_access = LocalDataAccess() - target = local_data_access.join("data", "**", f"{scenario_id}_*") - local_data_access.remove(target, confirm=confirm) - # Delete attributes self._clean() diff --git a/powersimdata/scenario/execute.py b/powersimdata/scenario/execute.py index ffd9b1539..1d99addb4 100644 --- a/powersimdata/scenario/execute.py +++ b/powersimdata/scenario/execute.py @@ -1,12 +1,12 @@ -import os +from scipy.io import savemat from powersimdata.data_access.context import Context -from powersimdata.input.export_data import export_case_mat, export_transformed_profile +from powersimdata.input.export_data import export_case_mat from powersimdata.input.grid import Grid from powersimdata.input.input_data import InputData from powersimdata.input.transform_grid import TransformGrid +from powersimdata.input.transform_profile import TransformProfile from powersimdata.scenario.ready import Ready -from powersimdata.utility import server_setup from powersimdata.utility.config import get_deployment_mode @@ -17,7 +17,7 @@ class Execute(Ready): """ name = "execute" - allowed = [] + allowed = ["delete"] exported_methods = { "check_progress", "extract_simulation_output", @@ -103,7 +103,6 @@ def prepare_simulation_input(self, profiles_as=None): si = SimulationInput( self._data_access, self._scenario_info, self.grid, self.ct ) - si.create_folder() for p in ["demand", "hydro", "solar", "wind"]: si.prepare_profile(p, profiles_as) @@ -195,27 +194,20 @@ def __init__(self, data_access, scenario_info, grid, ct): self.REL_TMP_DIR = self._data_access.tmp_folder(self.scenario_id) - def create_folder(self): - """Creates folder on server that will enclose simulation inputs.""" - description = self._data_access.description - print(f"--> Creating temporary folder on {description} for simulation inputs") - self._data_access.makedir(self.REL_TMP_DIR) - def prepare_mpc_file(self): """Creates MATPOWER case file.""" - file_name = f"{self.scenario_id}_case.mat" - storage_file_name = f"{self.scenario_id}_case_storage.mat" - file_path = os.path.join(server_setup.LOCAL_DIR, file_name) - storage_file_path = os.path.join(server_setup.LOCAL_DIR, storage_file_name) + file_path = "/".join([self.REL_TMP_DIR, "case.mat"]) + storage_file_path = "/".join([self.REL_TMP_DIR, "case_storage.mat"]) + print("Building MPC file") - export_case_mat(self.grid, file_path, storage_file_path) - self._data_access.move_to( - file_name, self.REL_TMP_DIR, change_name_to="case.mat" - ) - if len(self.grid.storage["gen"]) > 0: - self._data_access.move_to( - storage_file_name, self.REL_TMP_DIR, change_name_to="case_storage.mat" - ) + mpc, mpc_storage = export_case_mat(self.grid) + + with self._data_access.write(file_path, save_local=False) as f: + savemat(f, mpc, appendmat=False) + + if mpc_storage is not None: + with self._data_access.write(storage_file_path, save_local=False) as f: + savemat(f, mpc, appendmat=False) def prepare_profile(self, kind, profile_as=None, slice=False): """Prepares profile for simulation. @@ -224,17 +216,16 @@ def prepare_profile(self, kind, profile_as=None, slice=False): :param int/str profile_as: if given, copy profile from this scenario. :param bool slice: whether to slice the profiles by the Scenario's time range. """ + file_name = f"{kind}.csv" if profile_as is None: - file_name = "%s_%s.csv" % (self.scenario_id, kind) - filepath = os.path.join(server_setup.LOCAL_DIR, file_name) - export_transformed_profile( - kind, self._scenario_info, self.grid, self.ct, filepath, slice - ) + filepath = "/".join([self.REL_TMP_DIR, file_name]) - self._data_access.move_to( - file_name, self.REL_TMP_DIR, change_name_to=f"{kind}.csv" - ) + tp = TransformProfile(self._scenario_info, self.grid, self.ct, slice) + profile = tp.get_profile(kind) + print(f"Writing scaled {kind} profile to {filepath}") + with self._data_access.write(filepath, save_local=False) as f: + profile.to_csv(f) else: from_dir = self._data_access.tmp_folder(profile_as) - src = self._data_access.join(from_dir, f"{kind}.csv") + src = "/".join([from_dir, file_name]) self._data_access.copy(src, self.REL_TMP_DIR) diff --git a/powersimdata/scenario/tests/test_scenario.py b/powersimdata/scenario/tests/test_scenario.py index 1949f926c..f30d45770 100644 --- a/powersimdata/scenario/tests/test_scenario.py +++ b/powersimdata/scenario/tests/test_scenario.py @@ -1,6 +1,9 @@ import pytest +from powersimdata.data_access.context import Context +from powersimdata.scenario.delete import Delete from powersimdata.scenario.scenario import Scenario +from powersimdata.tests.mock_context import MockContext @pytest.mark.integration @@ -9,3 +12,66 @@ def test_bad_scenario_name(): # This test will fail if we do add a scenario with this name with pytest.raises(ValueError): Scenario("this_scenario_does_not_exist") + + +def test_scenario_workflow(monkeypatch): + mock_context = MockContext() + monkeypatch.setattr(Context, "get_data_access", mock_context.get_data_access) + + s = Scenario() + print(s.state.name) + + s.set_grid(interconnect="Texas") + + s.set_name("test", "dummy") + s.set_time("2016-01-01 00:00:00", "2016-01-01 03:00:00", "1H") + + s.set_base_profile("demand", "vJan2021") + s.set_base_profile("hydro", "vJan2021") + s.set_base_profile("solar", "vJan2021") + s.set_base_profile("wind", "vJan2021") + s.change_table.ct = { + "wind": { + "zone_id": { + 301: 1.1293320940114195, + 302: 2.2996731828360466, + 303: 1.1460693669609412, + 304: 1.5378918905751389, + 305: 1.6606575751914816, + }, + "plant_id": {12912: 0}, + } + } + + s.get_grid() + s.get_ct() + + rfs = s.data_access.fs + lfs = s.data_access.local_fs + tmp_dir = s.data_access.tmp_folder(1) + + s.print_scenario_info() + s.create_scenario() + + scenario_list = s.get_scenario_table() + assert 1 == scenario_list.shape[0] + + for fs in (lfs, rfs): + assert fs.exists("data/input/1_ct.pkl") + + # hack to use truncated profiles so the test runs quickly + s.info["grid_model"] = "test_usa_tamu" + s.prepare_simulation_input() + + tmp_files = rfs.listdir(tmp_dir) + assert len(tmp_files) > 0 + + s.change(Delete) + s.delete_scenario(confirm=False) + + for fs in (rfs, lfs): + assert not fs.exists(tmp_dir) + assert len(fs.listdir("data/input")) == 0 + + scenario_list = Scenario().get_scenario_table() + assert 0 == scenario_list.shape[0] diff --git a/powersimdata/tests/mock_context.py b/powersimdata/tests/mock_context.py new file mode 100644 index 000000000..46f2a1840 --- /dev/null +++ b/powersimdata/tests/mock_context.py @@ -0,0 +1,21 @@ +import os + +from powersimdata.data_access.data_access import TempDataAccess, get_blob_fs +from powersimdata.utility import templates + + +class MockContext: + def __init__(self): + self.data_access = self._setup() + + def get_data_access(self, ignored=None): + return self.data_access + + def _setup(self): + tda = TempDataAccess() + tda.fs.add_fs("profile_fs", get_blob_fs("profiles"), priority=2) + for path in ("ExecuteList.csv", "ScenarioList.csv"): + orig = os.path.join(templates.__path__[0], path) + with open(orig, "rb") as f: + tda.fs.upload(path, f) + return tda diff --git a/tox.ini b/tox.ini index b0ba5640a..2bc6fae60 100644 --- a/tox.ini +++ b/tox.ini @@ -15,7 +15,7 @@ deps = commands = pytest: pipenv sync --dev local: pytest -m 'not integration' {posargs} - integration: pytest + integration: pytest {posargs} format: black . format: isort . checkformatting: black . --check --diff @@ -29,7 +29,7 @@ ignore = E501,W503,E741,E203,W605 profile = black [pytest] -addopts = --cov=powersimdata --ignore-glob=**/sql/* +addopts = --ignore-glob=**/sql/* markers = integration: marks tests that require external dependencies (deselect with '-m "not integration"') db: marks tests that connect to a local database