Skip to content

Commit

Permalink
adding pkl, fixing edb concat and write
Browse files Browse the repository at this point in the history
  • Loading branch information
dhensle committed Sep 20, 2024
1 parent 3434c95 commit 914b9ca
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 22 deletions.
35 changes: 27 additions & 8 deletions activitysim/core/estimation.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def output_file_path(self, table_name, file_type=None, bundle_directory=False):

return os.path.join(output_dir, file_name)

def write_parquet(self, df, file_path, append=False):
def write_parquet(self, df, file_path, index, append=False):
"""Convert DF to be parquet compliant and write to disk"""
# Ensure column names are strings for parquet
df.columns = df.columns.astype(str)
Expand All @@ -211,9 +211,9 @@ def write_parquet(self, df, file_path, append=False):

self.debug(f"writing table: {file_path}")
if append and os.path.isfile(file_path):
df.to_parquet(file_path, engine="fastparquet", append=True)
df.to_parquet(file_path, engine="fastparquet", append=True, index=index)
else:
df.to_parquet(file_path)
df.to_parquet(file_path, index=index)

def write_table(
self,
Expand All @@ -235,7 +235,7 @@ def write_table(
append: boolean
bundle_directory: boolean
filetype: str
csv or parquet
csv or parquet or pkl
"""

Expand Down Expand Up @@ -264,8 +264,20 @@ def write_table(df, table_name, index, append, bundle_directory, filetype):
)
if filetype == "csv":
df.to_csv(file_path, mode="a", index=index, header=(not file_exists))
elif filetype == "parquet":
self.write_parquet(df, file_path, index, append)
elif filetype == "pkl":
if append:
# read the previous df and concat
prev_df = pd.read_pickle(file_path)
df = pd.concat([prev_df, df])
if index == False:
df.reset_index(drop=True, inplace=True)
df.to_pickle(file_path)
else:
self.write_parquet(df, file_path, append)
raise RuntimeError(
f"Unsupported filetype: {filetype}, allowed options are csv, parquet, pkl"
)

assert self.estimating

Expand Down Expand Up @@ -317,17 +329,24 @@ def write_omnibus_table(self):
df.sort_index(ascending=True, inplace=True, kind="mergesort")

filetype = self.settings.get("EDB_FILETYPE", "csv")
assert filetype in ["csv", "parquet"]

if filetype == "csv":
file_path = self.output_file_path(omnibus_table, "csv")
assert not os.path.isfile(file_path)

self.debug(f"writing table: {file_path}")
df.to_csv(file_path, mode="a", index=True, header=True)
else:

elif filetype == "parquet":
file_path = self.output_file_path(omnibus_table, "parquet")
self.write_parquet(df, file_path, append=False)
self.write_parquet(df, file_path, index=True, append=False)

elif filetype == "pkl":
file_path = self.output_file_path(omnibus_table, "pkl")
df.to_pickle(file_path)

else:
raise RuntimeError(f"Unsupported filetype: {filetype}")

self.debug("wrote_omnibus_choosers: %s" % file_path)

Expand Down
47 changes: 33 additions & 14 deletions activitysim/core/steps/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,19 @@ def concat_and_write_edb(df_concat_dict, write_dir):
# concatenate the dataframes and output final file
for table_name, df_array in df_concat_dict.items():
df = pd.concat(df_array)

# sort the dataframe by index
if df.index.name is not None:
df = df.sort_index()
else:
df = df.sort_values(by=df.columns[0])

if table_name.endswith(".csv"):
df.to_csv(os.path.join(write_dir, table_name), index=False)
elif table_name.endswith(".parquet"):
df.to_parquet(os.path.join(write_dir, table_name), index=True)
elif table_name.endswith(".pkl"):
df.to_pickle(os.path.join(write_dir, table_name))
else:
raise ValueError(f"Unknown file type {table_name}")

Expand All @@ -255,6 +264,7 @@ def coalesce_estimation_data_bundles(state):
In estimation mode, estimation data bundles are written to separate subdirectories for each subprocess.
This model will go through each subdirectory and move the files to the parent directory.
This will only occur if the lowest level directory contains the multiprocess step names.
Only multiprocess step names are used because that's how EDBs are written in estimation mode.
"""

logger.info("Coalescing Estimation Data Bundles")
Expand Down Expand Up @@ -284,18 +294,32 @@ def coalesce_estimation_data_bundles(state):
logger.debug(f"Coalescing {dir}")
# get the parent directory
cur_edb = Path(dir).parent.absolute()
if prev_edb is None:
prev_edb = cur_edb

# check if we have moved onto a new EDB
is_same_edb = cur_edb == prev_edb

for file in os.listdir(dir):
# if we have moved onto a new EDB, concatenate the dataframes and write the final files
if (
(not is_same_edb)
and (len(df_concat_dict) > 0)
and (len(df_concat_dict[list(df_concat_dict.keys())[0]]) > 1)
):
concat_and_write_edb(df_concat_dict, prev_edb)

# reset edb dir and dictionary
prev_edb = cur_edb
df_concat_dict = {}

for i, file in enumerate(os.listdir(dir)):
# get the file path
file_path = os.path.join(dir, file)

# look for files that are duplicated across subprocesses
is_coefs_file = file.endswith(".csv") and "coef" in file
is_settings_file = file.endswith(".yaml")
is_spec_file = file.endswith(".csv") and "SPEC" in file
is_spec_file = file.endswith(".csv") and ("spec" in file.lower())
is_landuse_file = file.endswith("_landuse.csv")
is_size_terms_file = file.endswith("_size_terms.csv")
is_duplicate_file = (
Expand All @@ -310,31 +334,26 @@ def coalesce_estimation_data_bundles(state):
# copy the file to the parent directory
shutil.copy(file_path, os.path.join(cur_edb, file))

if (
(not is_same_edb)
and (len(df_concat_dict) > 0)
and (len(df_concat_dict[list(df_concat_dict.keys())[0]]) > 1)
):
concat_and_write_edb(df_concat_dict, cur_edb)

# reset edb dir and dictionary
prev_edb = cur_edb
df_concat_dict = {}

if not is_duplicate_file:
# read file and store in dictionary
if file.endswith(".csv"):
df = pd.read_csv(file_path, low_memory=False)
elif file.endswith(".parquet"):
df = pd.read_parquet(file_path)
elif file.endswith(".pkl"):
df = pd.read_pickle(file_path)
else:
raise ValueError(
f"Unknown file type found {file}, expect csv, parquet, or pkl"
)

if file in df_concat_dict.keys():
df_concat_dict[file].append(df)
else:
df_concat_dict[file] = [df]

# delete the directory now that we have gone through all the files
shutil.rmtree(dir)
# shutil.rmtree(dir)

# need to concatenate the last set of dataframes
concat_and_write_edb(df_concat_dict, cur_edb)
Expand Down

0 comments on commit 914b9ca

Please sign in to comment.