Skip to content

Commit

Permalink
multiprocess initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
dhensle committed Aug 17, 2024
1 parent d81f5f2 commit 44e3c21
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 10 deletions.
10 changes: 7 additions & 3 deletions activitysim/abm/models/non_mandatory_tour_frequency.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,18 @@ def non_mandatory_tour_frequency(
)

if estimator:
estimator.write_spec(model_settings, bundle_directory=True)
bundle_directory = True
# writing to separte subdirectory for each segment if multiprocessing
if state.settings.multiprocess:
bundle_directory = False
estimator.write_spec(model_settings, bundle_directory=bundle_directory)
estimator.write_model_settings(
model_settings, model_settings_file_name, bundle_directory=True
model_settings, model_settings_file_name, bundle_directory=bundle_directory
)
# preserving coefficients file name makes bringing back updated coefficients more straightforward
estimator.write_coefficients(coefficients_df, segment_settings)
estimator.write_choosers(chooser_segment)
estimator.write_alternatives(alternatives, bundle_directory=True)
estimator.write_alternatives(alternatives, bundle_directory=bundle_directory)

# FIXME #interaction_simulate_estimation_requires_chooser_id_in_df_column
# shuold we do it here or have interaction_simulate do it?
Expand Down
5 changes: 4 additions & 1 deletion activitysim/abm/models/school_escorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,10 @@ def school_escorting(
coefficients_df, file_name=stage.upper() + "_COEFFICIENTS"
)
estimator.write_choosers(choosers)
estimator.write_alternatives(alts, bundle_directory=True)
if state.settings.multiprocess:
estimator.write_alternatives(alts, bundle_directory=False)
else:
estimator.write_alternatives(alts, bundle_directory=True)

# FIXME #interaction_simulate_estimation_requires_chooser_id_in_df_column
# shuold we do it here or have interaction_simulate do it?
Expand Down
12 changes: 9 additions & 3 deletions activitysim/abm/models/stop_frequency.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,15 @@ def stop_frequency(

if estimator:
estimator.write_spec(segment_settings, bundle_directory=False)
estimator.write_model_settings(
model_settings, model_settings_file_name, bundle_directory=True
)
# writing to separte subdirectory for each segment if multiprocessing
if state.settings.multiprocess:
estimator.write_model_settings(
model_settings, model_settings_file_name, bundle_directory=False
)
else:
estimator.write_model_settings(
model_settings, model_settings_file_name, bundle_directory=True
)
estimator.write_coefficients(coefficients_df, segment_settings)
estimator.write_choosers(chooser_segment)

Expand Down
35 changes: 32 additions & 3 deletions activitysim/core/estimation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@
ESTIMATION_SETTINGS_FILE_NAME = "estimation.yaml"


def unlink_files(directory_path, file_types=("csv", "yaml")):
def unlink_files(directory_path, file_types=("csv", "yaml", "parquet")):
"""
Deletes existing files in directory_path with file_types extensions.
"""
if not os.path.exists(directory_path):
return

for file_name in os.listdir(directory_path):
if file_name.endswith(file_types):
file_path = os.path.join(directory_path, file_name)
Expand All @@ -31,6 +37,16 @@ def unlink_files(directory_path, file_types=("csv", "yaml")):
print(e)


def estimation_enabled(state):
"""
Returns True if estimation.yaml exists in the configs directory.
"""
settings = state.filesystem.read_model_settings(
ESTIMATION_SETTINGS_FILE_NAME, mandatory=False
)
return settings is not None


class Estimator:
def __init__(
self, state: workflow.State, bundle_name, model_name, estimation_table_recipes
Expand All @@ -50,12 +66,12 @@ def __init__(
os.makedirs(output_dir) # make directory if needed

# delete estimation files
unlink_files(self.output_directory(), file_types=("csv", "yaml"))
unlink_files(self.output_directory(), file_types=("csv", "yaml", "parquet"))
if self.bundle_name != self.model_name:
# kind of inelegant to always delete these, but ok as they are redundantly recreated for each sub model
unlink_files(
self.output_directory(bundle_directory=True),
file_types=("csv", "yaml"),
file_types=("csv", "yaml", "parquet"),
)

# FIXME - not required?
Expand Down Expand Up @@ -139,6 +155,9 @@ def output_directory(self, bundle_directory=False):
if self.bundle_name != self.model_name and not bundle_directory:
dir = os.path.join(dir, self.model_name)

if self.state.settings.multiprocess:
dir = os.path.join(dir, self.state.get_injectable("pipeline_file_prefix"))

return dir

def output_file_path(self, table_name, file_type=None, bundle_directory=False):
Expand Down Expand Up @@ -546,6 +565,16 @@ def initialize_settings(self, state):
), "Index col '%s' not in survey_table '%s' in file: %s % (index_col, table_name, file_path)"
df.set_index(index_col, inplace=True)

# if multiprocessing then only return the households that are in the pipeline
if state.settings.multiprocess:
pipeline_hh_ids = state.get_table("households").index
if table_name == "households":
df = df[df.index.isin(pipeline_hh_ids)]
assert pipeline_hh_ids.equals(df.index), "household_ids not equal between survey and pipeline"
else:
assert "household_id" in df.columns
df = df[df.household_id.isin(pipeline_hh_ids)]

# add the table df to survey_tables
table_info["df"] = df

Expand Down
113 changes: 113 additions & 0 deletions activitysim/core/steps/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

import logging
import sys
import os
import shutil
from pathlib import Path

import numpy as np
import pandas as pd
Expand All @@ -13,6 +16,7 @@

from activitysim.core import configuration, workflow
from activitysim.core.workflow.checkpoint import CHECKPOINT_NAME
from activitysim.core.estimation import estimation_enabled

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -224,6 +228,111 @@ def write_data_dictionary(state: workflow.State) -> None:
print(f"{info}\n", file=output_file)


def find_lowest_level_directories(starting_directory):
lowest_dirs = list()

for root, dirs, files in os.walk(starting_directory):
if not dirs:
lowest_dirs.append(root)

return lowest_dirs


def concat_and_write_edb(df_concat_dict, write_dir):
# concatenate the dataframes and output final file
for table_name, df_array in df_concat_dict.items():
df = pd.concat(df_array)
if table_name.endswith(".csv"):
df.to_csv(os.path.join(write_dir, table_name), index=False)
elif table_name.endswith(".parquet"):
df.to_parquet(os.path.join(write_dir, table_name), index=True)
else:
raise ValueError(f"Unknown file type {table_name}")


def coalesce_estimation_data_bundles(state):
"""
In estimation mode, estimation data bundles are written to separate subdirectories for each subprocess.
This model will go through each subdirectory and move the files to the parent directory.
This will only occur if the lowest level directory contains the multiprocess step names.
"""

logger.info("Coalescing Estimation Data Bundles")

edb_dir = state.filesystem.get_output_dir("estimation_data_bundle")

lowest_dirs = find_lowest_level_directories(edb_dir)

multiprocessing_step_names = [step.name for step in state.settings.multiprocess_steps]
lowest_dirs = [dir for dir in lowest_dirs if any(step in dir for step in multiprocessing_step_names)]

if len(lowest_dirs) == 0:
logger.info("No estimation data bundles to coalesce")
return

prev_edb = None
df_concat_dict = {}

# loop through each lowest level directory
for dir in lowest_dirs:
logger.debug(f"Coalescing {dir}")
# get the parent directory
cur_edb = Path(dir).parent.absolute()

# check if we have moved onto a new EDB
is_same_edb = (cur_edb == prev_edb)

for file in os.listdir(dir):
# get the file path
file_path = os.path.join(dir, file)

# look for files that are duplicated across subprocesses
is_coefs_file = file.endswith(".csv") and "coef" in file
is_settings_file = file.endswith(".yaml")
is_spec_file = file.endswith(".csv") and "SPEC" in file
is_landuse_file = file.endswith("_landuse.csv")
is_size_terms_file = file.endswith("_size_terms.csv")
is_duplicate_file = (
is_coefs_file
or is_spec_file
or is_settings_file
or is_landuse_file
or is_size_terms_file
)

if is_duplicate_file and not is_same_edb:
# copy the file to the parent directory
shutil.copy(file_path, os.path.join(cur_edb, file))

if (not is_same_edb) and (len(df_concat_dict) > 0) and (len(df_concat_dict[list(df_concat_dict.keys())[0]]) > 1):
concat_and_write_edb(df_concat_dict, cur_edb)

# reset edb dir and dictionary
logger.info(f"Coalesced {cur_edb}")
prev_edb = cur_edb
df_concat_dict = {}

if not is_duplicate_file:
# read file and store in dictionary
if file.endswith(".csv"):
df = pd.read_csv(file_path, low_memory=False)
elif file.endswith(".parquet"):
df = pd.read_parquet(file_path)

if file in df_concat_dict.keys():
df_concat_dict[file].append(df)
else:
df_concat_dict[file] = [df]

# delete the directory now that we have gone through all the files
# os.rmdir(dir)

# need to concatenate the last set of dataframes
concat_and_write_edb(df_concat_dict, cur_edb)

return


@workflow.step
def write_tables(state: workflow.State) -> None:
"""
Expand Down Expand Up @@ -434,3 +543,7 @@ def map_func(x):
parquet.write_table(dt, file_path)
else:
raise ValueError(f"unknown file_type {file_type}")

is_estimation = estimation_enabled(state)
if state.settings.multiprocess and is_estimation:
coalesce_estimation_data_bundles(state)

0 comments on commit 44e3c21

Please sign in to comment.