Skip to content

Commit

Permalink
refactor: use callback to handle reading various file types
Browse files Browse the repository at this point in the history
  • Loading branch information
Jon Hagg committed Jan 26, 2022
1 parent c553833 commit b136d8f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 37 deletions.
40 changes: 18 additions & 22 deletions powersimdata/data_access/csv_store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import functools
import os
from pathlib import Path
from tempfile import mkstemp

import pandas as pd
Expand All @@ -24,6 +23,19 @@ def wrapper(self, *args, **kwargs):
return wrapper


def _parse_csv(file_object):
"""Read file from disk into data frame
:param str, path object or file-like object file_object: a reference to
the csv file
:return: (*pandas.DataFrame*) -- the specified file as a data frame.
"""
table = pd.read_csv(file_object)
table.set_index("id", inplace=True)
table.fillna("", inplace=True)
return table.astype(str)


class CsvStore:
"""Base class for common functionality used to manage scenario and execute
list stored as csv files on the server
Expand All @@ -42,30 +54,14 @@ def get_table(self):
:return: (*pandas.DataFrame*) -- the specified table as a data frame.
"""
filename = self._FILE_NAME
local_path = Path(server_setup.LOCAL_DIR, filename)

try:
self.data_access.copy_from(filename)
return self._get_table(filename)
except: # noqa
print(f"Failed to download {filename} from server")
print("Falling back to local cache...")

if local_path.is_file():
return self._parse_csv(local_path)
else:
raise FileNotFoundError(f"{filename} does not exist locally.")
return self._get_table(filename + ".2")

def _parse_csv(self, file_object):
"""Read file from disk into data frame
:param str, path object or file-like object file_object: a reference to
the csv file
:return: (*pandas.DataFrame*) -- the specified file as a data frame.
"""
table = pd.read_csv(file_object)
table.set_index("id", inplace=True)
table.fillna("", inplace=True)
return table.astype(str)
def _get_table(self, filename):
self.data_access.copy_from(filename)
return self.data_access.read(filename, callback=lambda f, _: _parse_csv(f))

def commit(self, table, checksum):
"""Save to local directory and upload if needed
Expand Down
34 changes: 19 additions & 15 deletions powersimdata/data_access/data_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, root):
self.join = fs.path.join
self.local_fs = None

def read(self, filepath):
def read(self, filepath, callback=None):
"""Reads data from data store.
:param str filepath: path to file, with extension either 'pkl', 'csv', or 'mat'.
Expand All @@ -66,21 +66,24 @@ def read(self, filepath):
print(f"{filepath} not found on local machine")
from_dir, filename = dirname(filepath), basename(filepath)
self.copy_from(filename, from_dir)
return self._read(self.local_fs, filepath)

def _read(self, fs, filepath):
if callback is None:
callback = self._read
with self.local_fs.openbin(filepath) as f:
return callback(f, filepath)

def _read(self, f, filepath):
ext = os.path.basename(filepath).split(".")[-1]
with fs.open(filepath, mode="rb") as f:
if ext == "pkl":
data = pd.read_pickle(f)
elif ext == "csv":
data = pd.read_csv(f, index_col=0, parse_dates=True)
data.columns = data.columns.astype(int)
elif ext == "mat":
# get fully qualified local path to matfile
data = self.local_fs.getsyspath(filepath)
else:
raise ValueError("Unknown extension! %s" % ext)
if ext == "pkl":
data = pd.read_pickle(f)
elif ext == "csv":
data = pd.read_csv(f, index_col=0, parse_dates=True)
data.columns = data.columns.astype(int)
elif ext == "mat":
# get fully qualified local path to matfile
data = self.local_fs.getsyspath(filepath)
else:
raise ValueError("Unknown extension! %s" % ext)

return data

Expand Down Expand Up @@ -181,7 +184,8 @@ def _check_file_exists(self, path, should_exist=True):
location, _ = self.fs.which(path)
exists = location is not None
if should_exist and not exists:
raise OSError(f"{path} not found on {location}")
remotes = [f[0] for f in self.fs.iterate_fs()]
raise OSError(f"{path} not found on any of {remotes}")
if not should_exist and exists:
raise OSError(f"{path} already exists on {location}")

Expand Down

0 comments on commit b136d8f

Please sign in to comment.