diff --git a/config/base.toml b/config/base.toml index da41aa4..a8b8357 100644 --- a/config/base.toml +++ b/config/base.toml @@ -20,7 +20,7 @@ nts_regions = [ ] # nts day of the week to use # 1: Monday, 2: Tuesday, 3: Wednesday, 4: Thursday, 5: Friday, 6: Saturday, 7: Sunday -nts_day_of_week = 3 +nts_days_of_week = [3] # what crs do we want the output to be in? (just add the number, e.g. 3857) output_crs = 3857 @@ -37,6 +37,13 @@ optional_columns = [ ] n_matches = 10 # What is the maximum number of NTS matches we want for each SPC household? +[feasible_assignment] +# detour factor when converting euclidian distance to actual travel distance +detour_factor = 1.56 +# decay rate when converting euclidian to travel distance (0.0001 is a good value) +# actual_distance = distance * (1 + ((detour_factor - 1) * np.exp(-decay_rate * distance))) +decay_rate = 0.0001 + [work_assignment] commute_level = "OA" use_percentages = true # if true, optimization problem will try to minimize percentage difference at OD level (not absolute numbers). Recommended to set it to true @@ -48,3 +55,15 @@ max_zones = 8 # maximum number of feasible zones to include in the opti [postprocessing] pam_jitter = 30 pam_min_duration = 10 +# for get_pt_subscription: everyone above this age has a subscription (pensioners get free travel) +# TODO: more sophisticated approach +pt_subscription_age = 66 +# to define if a person is a student: +# eveyone below this age is a student +student_age_base = 16 +# everyone below this age that has at least one "education" activity is a student +student_age_upper = 30 +# eveyone who uses one of the modes below is classified as a passenger (isPassenger = True) +modes_passenger = ['car_passenger', 'taxi'] +# yearly state pension: for getting hhlIncome of pensioners +state_pension = 11502 diff --git a/config/base_all_msoa.toml b/config/base_all_msoa.toml new file mode 100644 index 0000000..4593485 --- /dev/null +++ b/config/base_all_msoa.toml @@ -0,0 +1,78 @@ +[parameters] +seed = 0 + # this is used to query poi data from osm and to load in SPC data +region = "leeds" +# how many people from the SPC do we want to run the model for? Comment out if you want to run the analysis on the entire SPC populaiton +number_of_households = 25000 +# "OA21CD": OA level, "MSOA11CD": MSOA level +zone_id = "MSOA21CD" + # Only set to true if you have travel time matrix at the level specified in boundary_geography +travel_times = false +boundary_geography = "MSOA" +# NTS years to use +nts_years = [2019, 2021, 2022] +# NTS regions to use +nts_regions = [ + 'Yorkshire and the Humber', + 'North West', + 'North East', + 'East Midlands', + 'West Midlands', + 'East of England', + 'South East', + 'South West'] +# nts day of the week to use +# 1: Monday, 2: Tuesday, 3: Wednesday, 4: Thursday, 5: Friday, 6: Saturday, 7: Sunday +nts_day_of_week = 3 +# what crs do we want the output to be in? (just add the number, e.g. 3857) +output_crs = 3857 + +[matching] +# for optional and required columns, see the [iterative_match_categorical](https://github.com/Urban-Analytics-Technology-Platform/acbm/blob/ca181c54d7484ebe44706ff4b43c26286b22aceb/src/acbm/matching.py#L110) function +# Do not add any column not listed below. You can only move a column from optional to require (or vise versa) +required_columns = [ + "number_adults", + "number_children", + "num_pension_age", +] +optional_columns = [ + "number_cars", + "rural_urban_2_categories", + "employment_status", + "tenure_status", +] +# What is the maximum number of NTS matches we want for each SPC household? +n_matches = 10 + +[feasible_assignment] +# detour factor when converting euclidian distance to actual travel distance +detour_factor = 1.56 +# decay rate when converting euclidian to travel distance (0.0001 is a good value) +# actual_distance = distance * (1 + ((detour_factor - 1) * np.exp(-decay_rate * distance))) +decay_rate = 0.0001 + +[work_assignment] +commute_level = "MSOA" +# if true, optimization problem will try to minimize percentage difference at OD level (not absolute numbers). Recommended to set it to true +use_percentages = true +# weights to add for each objective in the optimization problem +weight_max_dev = 0.2 +weight_total_dev = 0.8 +# maximum number of feasible zones to include in the optimization problem (less zones makes problem smaller - so faster, but at the cost of a better solution) +max_zones = 10 + +[postprocessing] +pam_jitter = 30 +pam_min_duration = 10 +# for get_pt_subscription: everyone above this age has a subscription (pensioners get free travel) +# TODO: more sophisticated approach +pt_subscription_age = 66 +# to define if a person is a student: +# eveyone below this age is a student +student_age_base = 16 +# everyone below this age that has at least one "education" activity is a student +student_age_upper = 30 +# eveyone who uses one of the modes below is classified as a passenger (isPassenger = True) +modes_passenger = ['car_passenger', 'taxi'] +# yearly state pension: for getting hhlIncome of pensioners +state_pension = 11502 diff --git a/scripts/2_match_households_and_individuals.py b/scripts/2_match_households_and_individuals.py index b86ca93..546977d 100644 --- a/scripts/2_match_households_and_individuals.py +++ b/scripts/2_match_households_and_individuals.py @@ -7,7 +7,7 @@ from acbm.assigning.utils import cols_for_assignment_all from acbm.cli import acbm_cli from acbm.config import load_and_setup_config -from acbm.matching import MatcherExact, match_individuals +from acbm.matching import MatcherExact, match_individuals, match_remaining_individuals from acbm.preprocessing import ( count_per_group, nts_filter_by_region, @@ -16,6 +16,10 @@ transform_by_group, truncate_values, ) +from acbm.utils import ( + households_with_common_travel_days, + households_with_travel_days_in_nts_weeks, +) @acbm_cli @@ -222,14 +226,17 @@ def get_interim_path( logger.info("Filtering NTS data by specified year(s)") + logger.info(f"Total NTS households: {nts_households.shape[0]:,.0f}") years = config.parameters.nts_years nts_individuals = nts_filter_by_year(nts_individuals, psu, years) nts_households = nts_filter_by_year(nts_households, psu, years) nts_trips = nts_filter_by_year(nts_trips, psu, years) + logger.info( + f"Total NTS households (after year filtering): {nts_households.shape[0]:,.0f}" + ) # #### Filter by geography - # regions = config.parameters.nts_regions @@ -237,8 +244,30 @@ def get_interim_path( nts_households = nts_filter_by_region(nts_households, psu, regions) nts_trips = nts_filter_by_region(nts_trips, psu, regions) - # Create dictionaries of key value pairs + logger.info( + f"Total NTS households (after region filtering): {nts_households.shape[0]:,.0f}" + ) + + # Ensure that the households have at least one day in `nts_days_of_week` that + # all household members have trips for + if config.parameters.common_household_day: + hids = households_with_common_travel_days( + nts_trips, config.parameters.nts_days_of_week + ) + else: + hids = households_with_travel_days_in_nts_weeks( + nts_trips, config.parameters.nts_days_of_week + ) + + # Subset individuals and households given filtering of trips + nts_trips = nts_trips[ + nts_trips["HouseholdID"].isin(hids) + & nts_trips["TravDay"].isin(config.parameters.nts_days_of_week) + ] + nts_individuals = nts_individuals[nts_individuals["HouseholdID"].isin(hids)] + nts_households = nts_households[nts_households["HouseholdID"].isin(hids)] + # Create dictionaries of key value pairs """ guide to the dictionaries: @@ -924,6 +953,19 @@ def get_interim_path( show_progress=True, ) + # match remaining individuals + remaining_ids = spc_edited.loc[ + ~spc_edited.index.isin(matches_ind.keys()), "id" + ].to_list() + matches_remaining_ind = match_remaining_individuals( + df1=spc_edited, + df2=nts_individuals, + matching_columns=["age_group", "sex"], + remaining_ids=remaining_ids, + show_progress=True, + ) + matches_ind.update(matches_remaining_ind) + # save random sample with open( get_interim_path("matches_ind_level_categorical_random_sample.pkl"), "wb" diff --git a/scripts/3.1_assign_primary_feasible_zones.py b/scripts/3.1_assign_primary_feasible_zones.py index 613e06f..6da9d26 100644 --- a/scripts/3.1_assign_primary_feasible_zones.py +++ b/scripts/3.1_assign_primary_feasible_zones.py @@ -7,6 +7,7 @@ from acbm.assigning.utils import ( activity_chains_for_assignment, get_activities_per_zone, + get_chosen_day, intrazone_time, replace_intrazonal_travel_time, zones_to_time_matrix, @@ -28,11 +29,15 @@ def main(config_file): activity_chains = activity_chains_for_assignment(config) logger.info("Activity chains loaded") - # Filter to a specific day of the week logger.info("Filtering activity chains to a specific day of the week") - activity_chains = activity_chains[ - activity_chains["TravDay"] == config.parameters.nts_day_of_week - ] + + # Generate random sample of days by household + get_chosen_day(config).to_parquet( + config.output_path / "interim" / "assigning" / "chosen_trav_day.parquet" + ) + + # Filter to chosen day + activity_chains = activity_chains_for_assignment(config, subset_to_chosen_day=True) # --- Study area boundaries @@ -74,7 +79,11 @@ def main(config_file): logger.info("Creating estimated travel times matrix") # Create a new travel time matrix based on distances between zones travel_time_estimates = zones_to_time_matrix( - zones=boundaries, id_col=config.zone_id, time_units="m" + zones=boundaries, + id_col=config.zone_id, + time_units="m", + detour_factor=config.feasible_assignment.detour_factor, + decay_rate=config.feasible_assignment.decay_rate, ) logger.info("Travel time estimates created") @@ -203,6 +212,8 @@ def main(config_file): time_tolerance=config.parameters.tolerance_edu if config.parameters.tolerance_edu is not None else 0.3, + detour_factor=config.feasible_assignment.detour_factor, + decay_rate=config.feasible_assignment.decay_rate, ) logger.info("Saving feasible zones for education activities") @@ -230,6 +241,8 @@ def main(config_file): time_tolerance=config.parameters.tolerance_work if config.parameters.tolerance_work is not None else 0.3, + detour_factor=config.feasible_assignment.detour_factor, + decay_rate=config.feasible_assignment.decay_rate, ) logger.info("Saving feasible zones for work activities") diff --git a/scripts/3.2.1_assign_primary_zone_edu.py b/scripts/3.2.1_assign_primary_zone_edu.py index 183a46f..d540eec 100644 --- a/scripts/3.2.1_assign_primary_zone_edu.py +++ b/scripts/3.2.1_assign_primary_zone_edu.py @@ -51,11 +51,8 @@ def main(config_file): logger.info("Loading activity chains") activity_chains = activity_chains_for_assignment( - config, columns=cols_for_assignment_edu() + config, columns=cols_for_assignment_edu(), subset_to_chosen_day=True ) - activity_chains = activity_chains[ - activity_chains["TravDay"] == config.parameters.nts_day_of_week - ] logger.info("Filtering activity chains for trip purpose: education") activity_chains_edu = activity_chains[activity_chains["dact"] == "education"] diff --git a/scripts/3.2.2_assign_primary_zone_work.py b/scripts/3.2.2_assign_primary_zone_work.py index 1c5e98b..fbf667f 100644 --- a/scripts/3.2.2_assign_primary_zone_work.py +++ b/scripts/3.2.2_assign_primary_zone_work.py @@ -42,11 +42,9 @@ def main(config_file): # --- Activity chains logger.info("Loading activity chains") - - activity_chains = activity_chains_for_assignment(config, cols_for_assignment_work()) - activity_chains = activity_chains[ - activity_chains["TravDay"] == config.parameters.nts_day_of_week - ] + activity_chains = activity_chains_for_assignment( + config, cols_for_assignment_work(), subset_to_chosen_day=True + ) logger.info("Filtering activity chains for trip purpose: work") activity_chains_work = activity_chains[activity_chains["dact"] == "work"] @@ -100,7 +98,7 @@ def main(config_file): logger.info("Step 4: Filtering rows and dropping unnecessary columns") travel_demand_clipped = travel_demand[ - travel_demand["Place of work indicator (4 categories) code"].isin([1, 3]) + travel_demand["Place of work indicator (4 categories) code"].isin([3]) ] travel_demand_clipped = travel_demand_clipped.drop( columns=[ @@ -141,7 +139,7 @@ def main(config_file): logger.info("Step 2: Filtering rows and dropping unnecessary columns") travel_demand_clipped = travel_demand[ - travel_demand["Place of work indicator (4 categories) code"].isin([1, 3]) + travel_demand["Place of work indicator (4 categories) code"].isin([3]) ] travel_demand_clipped = travel_demand_clipped.drop( columns=[ @@ -202,7 +200,9 @@ def main(config_file): #### ASSIGN TO ZONE FROM FEASIBLE ZONES #### zone_assignment = WorkZoneAssignment( - activities_to_assign=possible_zones_work, actual_flows=travel_demand_dict_nomode + activities_to_assign=possible_zones_work, + actual_flows=travel_demand_dict_nomode, + scaling=config.parameters.part_time_work_prob, ) assignments_df = zone_assignment.select_work_zone_optimization( diff --git a/scripts/3.2.3_assign_secondary_zone.py b/scripts/3.2.3_assign_secondary_zone.py index a87ea10..0f8b045 100644 --- a/scripts/3.2.3_assign_secondary_zone.py +++ b/scripts/3.2.3_assign_secondary_zone.py @@ -38,10 +38,7 @@ def main(config_file): # --- Load in the data logger.info("Loading: activity chains") - activity_chains = activity_chains_for_assignment(config) - activity_chains = activity_chains[ - activity_chains["TravDay"] == config.parameters.nts_day_of_week - ] + activity_chains = activity_chains_for_assignment(config, subset_to_chosen_day=True) # TODO: remove obsolete comment # --- Add OA21CD to the data diff --git a/scripts/3.3_assign_facility_all.py b/scripts/3.3_assign_facility_all.py index 2099b9b..1d197c3 100644 --- a/scripts/3.3_assign_facility_all.py +++ b/scripts/3.3_assign_facility_all.py @@ -307,7 +307,10 @@ def main(config_file): x_col="TripDisIncSW", y_col="length", x_label="Reported Travel Distance (km)", - y_label="Actual Distance - Euclidian (km)", + y_label="Actual Distance - Estimated (km)", + detour_factor=config.feasible_assignment.detour_factor, + decay_rate=config.feasible_assignment.decay_rate, + x_axis_max=50, crs=f"EPSG:{config.output_crs}", title_prefix=f"Scatter plot of TripDisIncSW vs. Length for {activity_type}", save_dir=config.output_path / "plots/assigning/", @@ -330,8 +333,11 @@ def main(config_file): activity_type_col="destination activity", x_col="TripTotalTime", y_col="length", - x_label="Reported Travel TIme (min)", - y_label="Actual Distance - Euclidian (km)", + x_label="Reported Travel Time (min)", + y_label="Actual Distance - Estimated (km)", + detour_factor=config.feasible_assignment.detour_factor, + decay_rate=config.feasible_assignment.decay_rate, + x_axis_max=180, crs=f"EPSG:{config.output_crs}", title_prefix="Scatter plot of TripTotalTime vs. Length", save_dir=config.output_path / "plots/assigning/", @@ -355,6 +361,9 @@ def main(config_file): y_col="time", x_label="Reported Travel TIme (min)", y_label="Modelled time (min)", + detour_factor=config.feasible_assignment.detour_factor, + decay_rate=config.feasible_assignment.decay_rate, + x_axis_max=180, crs=f"EPSG:{config.output_crs}", title_prefix="Scatter plot of TripTotalTime vs. Modelled time", save_dir=config.output_path / "plots/assigning/", diff --git a/scripts/4_validation.py b/scripts/4_validation.py index ab469cf..ab64025 100644 --- a/scripts/4_validation.py +++ b/scripts/4_validation.py @@ -29,7 +29,7 @@ def main(config_file): # NTS data legs_nts = pd.read_parquet(config.output_path / "nts_trips.parquet") - legs_nts = legs_nts[legs_nts["TravDay"] == config.parameters.nts_day_of_week] + legs_nts = legs_nts[legs_nts["TravDay"].isin(config.parameters.nts_days_of_week)] # Model outputs legs_acbm = pd.read_csv(config.output_path / "legs.csv") @@ -217,6 +217,8 @@ def main(config_file): start_wkt_col="start_location_geometry_wkt", end_wkt_col="end_location_geometry_wkt", crs_epsg=config.output_crs, + detour_factor=1.56, + decay_rate=0.0001, ) # Plot: Aggregate diff --git a/src/acbm/assigning/feasible_zones_primary.py b/src/acbm/assigning/feasible_zones_primary.py index 9c29807..9ef9f26 100644 --- a/src/acbm/assigning/feasible_zones_primary.py +++ b/src/acbm/assigning/feasible_zones_primary.py @@ -77,6 +77,8 @@ def get_possible_zones( travel_times: Optional[pd.DataFrame] = None, filter_by_activity: bool = False, time_tolerance: float = 0.2, + detour_factor: float = 1.56, + decay_rate: float = 0.0001, ) -> dict: """ Get possible zones for all activity chains in the dataset. This function loops over the travel_times dataframe and filters by mode, time of day and weekday/weekend. @@ -106,6 +108,13 @@ def get_possible_zones( activity chain's travel time (which is stored in "TripTotalTime"). Allowable travel_times are those that fall in the range of: travel_time_reported * (1 - time_tolerance) <= travel_time_reported <= travel_time_reported * (1 + time_tolerance) Default = 0.2 + detour_factor: float + The detour factor used to calculate travel distances between zones from euclidian distances. + Default = 1.56 + decay_rate: float + The decay rate used to calculate travel times between zones from travel distances. Detours make up a smaller portion of + longer distance trips. Default = 0.0001 + Returns ------- @@ -137,7 +146,11 @@ def get_possible_zones( if travel_times is None: logger.info("Travel time matrix not provided: Creating travel times estimates") travel_times = zones_to_time_matrix( - zones=boundaries, id_col=zone_id, time_units="m" + zones=boundaries, + id_col=zone_id, + time_units="m", + detour_factor=detour_factor, + decay_rate=decay_rate, ) list_of_modes = activity_chains["mode"].unique() @@ -224,7 +237,7 @@ def get_possible_zones( travel_times_filtered_mode_time_day = travel_times_filtered_mode # if mode = car_passenger, we compare to the travel time for car (we don't # have travel times for car_passenger) - if mode == "car_passenger": + if mode in ("car_passenger", "taxi"): activity_chains_filtered = activity_chains[ (activity_chains["mode"] == "car") ] diff --git a/src/acbm/assigning/plots.py b/src/acbm/assigning/plots.py index 80a7bd1..c401d2b 100644 --- a/src/acbm/assigning/plots.py +++ b/src/acbm/assigning/plots.py @@ -11,6 +11,8 @@ from matplotlib import patches as mpatches from shapely.geometry import LineString +from acbm.assigning.utils import _adjust_distance + def plot_workzone_assignment_line( assignment_results: pd.DataFrame, @@ -387,7 +389,10 @@ def plot_scatter_actual_reported( title_prefix: str, activity_type: str, activity_type_col: str, + detour_factor: float, + decay_rate: float, crs: str, + x_axis_max: float, save_dir: str | Path | None = None, display: bool = False, y_scale: float = 1 / 1000, @@ -405,7 +410,10 @@ def plot_scatter_actual_reported( - title_prefix: Prefix for the plot titles. - activity_type: Type of activity to plot. - activity_type_col: Column name for the activity type. + - detour_factor: Detour factor for the distance adjustment. (see `_adjust_distance`) + - decay_rate: Decay rate for the distance adjustment. - crs: Coordinate reference system (CRS) of the activities data. + - x_axis_max: Maximum value for the x-axis. (use to remove very high reported distances / times) - save_dir: Directory to save the plots. If None, plots are not saved. - display: Whether to display plots by calling `plt.show()`. """ @@ -434,6 +442,13 @@ def plot_scatter_actual_reported( # calculate the length of the line_geometry in meters activity_chains_plot["length"] = activity_chains_plot["line_geometry"].length + # Create a column for estimated distance + activity_chains_plot["length"] = activity_chains_plot["length"].apply( + lambda d: _adjust_distance( + d, detour_factor=detour_factor, decay_rate=decay_rate + ) + ) + # Calculate the number of rows and columns for the subplots. It is a function of the number of modes nrows = math.ceil(len(activity_chains_plot["mode"].unique()) / 2) ncols = 2 @@ -460,6 +475,10 @@ def plot_scatter_actual_reported( p = np.poly1d(z) ax.plot(subset_mode[x_col], p(subset_mode[x_col]), "r--") + # Set x-axis limit + if x_axis_max: + ax.set_xlim(0, x_axis_max) + ax.set_title(f"{title_prefix} for mode: {mode}") ax.set_xlabel(x_label) ax.set_ylabel(y_label) diff --git a/src/acbm/assigning/select_facility.py b/src/acbm/assigning/select_facility.py index 28a9e0d..485544e 100644 --- a/src/acbm/assigning/select_facility.py +++ b/src/acbm/assigning/select_facility.py @@ -1,5 +1,5 @@ import logging -from multiprocessing import Pool +from multiprocessing import Pool, cpu_count from typing import Optional, Tuple import geopandas as gpd @@ -204,9 +204,9 @@ def select_facility( dict[str, Tuple[str, Point ] | Tuple[float, float]]: Unique ID column as keys with selected facility ID and facility ID's geometry, or (np.nan, np.nan) """ - # TODO: update this to be configurable, `None` is os.process_cpu_count() # TODO: check if this is deterministic for a given seed (or pass seed to pool) - with Pool(None) as p: + # TODO: update to be configurable + with Pool(max(int(cpu_count() * 0.75), 1)) as p: # Set to a large enough chunk size so that each process # has a sufficiently large amount of processing to do. chunk_size = 16_000 diff --git a/src/acbm/assigning/select_zone_work.py b/src/acbm/assigning/select_zone_work.py index 5005842..cb4c7c0 100644 --- a/src/acbm/assigning/select_zone_work.py +++ b/src/acbm/assigning/select_zone_work.py @@ -51,6 +51,7 @@ class WorkZoneAssignment: remaining_flows: Dict[Tuple[str, str], int] = field(init=False) total_flows: Dict[str, int] = field(init=False) percentages: Dict[Tuple[str, str], float] = field(init=False) + scaling: float = field(init=True) def __post_init__(self): """ @@ -75,9 +76,9 @@ def _calculate_total_flows(self) -> Dict[str, int]: total_flows = {} for (from_zone, _), flow in self.actual_flows.items(): if from_zone in total_flows: - total_flows[from_zone] += flow + total_flows[from_zone] += int(flow * self.scaling) else: - total_flows[from_zone] = flow + total_flows[from_zone] = int(flow * self.scaling) return total_flows def _calculate_percentages(self) -> Dict[Tuple[str, str], float]: diff --git a/src/acbm/assigning/utils.py b/src/acbm/assigning/utils.py index c829cc1..614612e 100644 --- a/src/acbm/assigning/utils.py +++ b/src/acbm/assigning/utils.py @@ -3,6 +3,7 @@ import geopandas as gpd import numpy as np import pandas as pd +import polars as pl from acbm.config import Config @@ -11,25 +12,26 @@ def cols_for_assignment_all() -> list[str]: """Gets activity chains with subset of columns required for assignment.""" return [ *cols_for_assignment_edu(), - "household", "oact", "nts_ind_id", "nts_hh_id", "age_years", "TripDisIncSW", "tet", + "DayID", ] def cols_for_assignment_edu() -> list[str]: """Gets activity chains with subset of columns required for assignment.""" return [ + "id", + "household", "TravDay", "OA11CD", "dact", "mode", "tst", - "id", "seq", "TripTotalTime", "education_type", @@ -42,16 +44,26 @@ def cols_for_assignment_work() -> list[str]: def activity_chains_for_assignment( - config: Config, columns: list[str] | None = None + config: Config, columns: list[str] | None = None, subset_to_chosen_day: bool = False ) -> pd.DataFrame: """Gets activity chains with subset of columns required for assignment.""" if columns is None: columns = cols_for_assignment_all() - return pd.read_parquet( + activity_chains = pd.read_parquet( config.spc_with_nts_trips_filepath, columns=columns, ) + if not subset_to_chosen_day: + return activity_chains + + return activity_chains.merge( + pd.read_parquet( + config.output_path / "interim" / "assigning" / "chosen_trav_day.parquet" + ), + on=["id", "TravDay"], + how="inner", + ) def _map_time_to_day_part( @@ -286,10 +298,39 @@ def _get_activities_per_zone_df(activities_per_zone: dict) -> pd.DataFrame: return pd.concat(activities_per_zone.values()) +def _adjust_distance( + distance: float, + detour_factor: float, + decay_rate: float, +) -> float: + """ + Adjusts euclidian distances by adding a detour factor. We use minkowski distance + and a decay rate, as longer detour makes up a smaller proportion of the total + distance as the distance increases. + + Parameters + ---------- + distance : float + The original distance. + detour_factor : float + The detour factor to be applied. + decay_rate : float + The decay rate to be applied. + + Returns + ------- + float + The adjusted distance. + """ + return distance * (1 + ((detour_factor - 1) * np.exp(-decay_rate * distance))) + + def zones_to_time_matrix( zones: gpd.GeoDataFrame, time_units: str, id_col: Optional[str] = None, + detour_factor: float = 1.56, + decay_rate: float = 0.0001, ) -> pd.DataFrame: """ Calculates the distance matrix between the centroids of the given zones and returns it as a DataFrame. The matrix also adds @@ -307,6 +348,8 @@ def zones_to_time_matrix( The name of the column in the zones GeoDataFrame to use as the ID. If None, the index values are used. Default is None. time_units: str, optional The units to use for the travel time. Options are 's' for seconds and 'm' for minutes. + detour_factor: float, optional + The detour factor to apply to the distance. Default is 1.56. Returns ------- pd.DataFrame @@ -323,6 +366,11 @@ def zones_to_time_matrix( distances = distances.stack().reset_index() distances.columns = [f"{id_col}_from", f"{id_col}_to", "distance"] + # adjust distance column by adding a detour factor + distances["distance"] = distances["distance"].apply( + lambda d: _adjust_distance(d, detour_factor, decay_rate) + ) + # replace the index values with the id_col values if id_col is specified if id_col is not None: # create a mapping from index to id_col values @@ -449,7 +497,8 @@ def intrazone_time(zones: gpd.GeoDataFrame, key_column: str) -> dict: # Calculate the area of each zone zones["area"] = zones["geometry"].area # Calculate the average distance within each zone - zones["average_dist"] = np.sqrt(zones["area"]) / 2 + # sqrt(area) / 2 would be radius + zones["average_dist"] = np.sqrt(zones["area"]) / 1.5 # Mode speeds in m/s mode_speeds_mps = { @@ -525,3 +574,115 @@ def replace_intrazonal_travel_time( # Return the modified DataFrame return travel_times_copy + + +def get_chosen_day(config: Config) -> pd.DataFrame: + """Gets the chosen day for population given config.""" + acs = pl.DataFrame(activity_chains_for_assignment(config)) + + if config.parameters.common_household_day: + return ( + acs.join( + acs.group_by("household") + .agg(pl.col("TravDay").unique().sample(1, with_replacement=True)) + .explode("TravDay"), + on=["household", "TravDay"], + how="inner", + ) + .select(["id", "TravDay"]) + .unique() + .sort("id") + .to_pandas() + ) + + # For any TravDay and modelling increased households + work_days = ( + acs.filter(pl.col("dact").eq("work")) + .group_by("id") + .agg(pl.col("TravDay").unique()) + .select(["id", pl.col("TravDay").list.drop_nulls().list.sample(n=1)]) + .explode("TravDay") + .rename({"TravDay": "TravDayWork"}) + ) + non_work_days = ( + acs.filter(~pl.col("dact").eq("work")) + .group_by("id") + .agg(pl.col("TravDay").unique()) + .select(["id", pl.col("TravDay").list.drop_nulls().list.sample(n=1)]) + .explode("TravDay") + .rename({"TravDay": "TravDayNonWork"}) + ) + + any_days = ( + acs.group_by("id") + .agg(pl.col("TravDay").unique()) + .select(["id", pl.col("TravDay").list.drop_nulls()]) + .select( + [ + "id", + pl.when(pl.col("TravDay").list.len() > 0) + # Note: this has to be set to with_replacement despite non-empty check + .then(pl.col("TravDay").list.sample(n=1, with_replacement=True)) + .otherwise(None), + ] + ) + .explode("TravDay") + .rename({"TravDay": "TravDayAny"}) + ).sort("id") + + # Combine day choices for different conditions + acs_combine = ( + acs.join(work_days, on="id", how="left", coalesce=True) + .join(non_work_days, on="id", how="left", coalesce=True) + .join(any_days, on="id", how="left", coalesce=True) + .join( + pl.scan_parquet(config.spc_combined_filepath) + .select(["id", "pwkstat"]) + .collect(), + on="id", + ) + ) + + # Choose a day given pwkstat + acs_combine = acs_combine.with_columns( + [ + # If pwkstat = 1 (full time) + # and a work travel day is available + pl.when(pl.col("pwkstat").eq(1) & pl.col("TravDayWork").is_not_null()) + .then(pl.col("TravDayWork")) + .otherwise( + # If pwkstat = 1 (full time) + # and a work travel day is NOT available + pl.when(pl.col("pwkstat").eq(1) & pl.col("TravDayWork").is_null()) + .then(pl.col("TravDayAny")) + .otherwise( + # If pwkstat = 2 (part time) + # and a work travel day is available + # and a non-work travel day is available + pl.when( + pl.col("pwkstat").eq(2) + & pl.col("TravDayWork").is_not_null() + & pl.col("TravDayNonWork").is_not_null() + ) + .then( + # Sample either TravDayWork or TravDayNonWork + # stochastically given config + pl.col("TravDayWork") + # TODO: update from config + if np.random.random() < 1 + else pl.col("TravDayNonWork") + ) + .otherwise(pl.col("TravDayAny")) + ) + ) + .alias("ChosenTravDay") + ] + ) + + return ( + acs_combine.select(["id", "ChosenTravDay"]) + .unique() + .rename({"ChosenTravDay": "TravDay"}) + .sort("id") + .to_pandas() + ) diff --git a/src/acbm/config.py b/src/acbm/config.py index 1bb1e4e..ed3daf4 100644 --- a/src/acbm/config.py +++ b/src/acbm/config.py @@ -9,6 +9,7 @@ import geopandas as gpd import jcs import numpy as np +import polars as pl import tomlkit from pydantic import BaseModel, Field, field_serializer, field_validator @@ -26,10 +27,12 @@ class Parameters(BaseModel): boundary_geography: str nts_years: list[int] nts_regions: list[str] - nts_day_of_week: int + nts_days_of_week: list[int] output_crs: int tolerance_work: float | None = None tolerance_edu: float | None = None + common_household_day: bool = True + part_time_work_prob: float = 0.7 @dataclass(frozen=True) @@ -40,6 +43,12 @@ class MatchingParams(BaseModel): chunk_size: int = 50_000 +@dataclass(frozen=True) +class FeasibleAssignmentParams(BaseModel): + detour_factor: float + decay_rate: float + + @dataclass(frozen=True) class WorkAssignmentParams(BaseModel): use_percentages: bool @@ -84,10 +93,13 @@ def serialize_filepath(self, filepath: Path | str | None) -> str | None: class Config(BaseModel): parameters: Parameters = Field(description="Config: parameters.") + matching: MatchingParams = Field(description="Config: parameters for matching.") + feasible_assignment: FeasibleAssignmentParams = Field( + description="Config: parameters for assignment of feasible zones." + ) work_assignment: WorkAssignmentParams = Field( description="Config: parameters for work assignment." ) - matching: MatchingParams = Field(description="Config: parameters for matching.") postprocessing: Postprocessing = Field( description="Config: parameters for postprocessing." ) @@ -350,6 +362,8 @@ def init_rng(self): try: np.random.seed(self.seed) random.seed(self.seed) + pl.set_random_seed(self.seed) + except Exception as err: msg = f"config does not provide a rng seed with err: {err}" raise ValueError(msg) from err diff --git a/src/acbm/matching.py b/src/acbm/matching.py index 4c0a5b1..595fda6 100644 --- a/src/acbm/matching.py +++ b/src/acbm/matching.py @@ -294,3 +294,62 @@ def match_individuals( matches.update(match) return matches + + +def match_remaining_individuals( + df1: pd.DataFrame, + df2: pd.DataFrame, + matching_columns: list, + remaining_ids: list[int], + show_progress: bool = False, +) -> dict: + """ + Apply a matching function iteratively to members of each household. + In each iteration, filter df1 and df2 to the household ids of item i + in matches_hh, and then apply the matching function to the filtered DataFrames. + + Parameters + ---------- + df1: pandas DataFrame + The first DataFrame to be matched on + df2: pandas DataFrame + The second DataFrame to be matched with + matching_columns: list + The columns to be used for the matching + matches_hh: dict + A dictionary with the matched household ids {df1_id: df2_id} + show_progress: bool + Whether to print the progress of the matching to the console + + Returns + ------- + matches: dict + A dictionary with the matched row indeces from the two DataFrames {df1: df2} + + """ + # Initialize an empty dic to store the matches + matches = {} + + # loop over all groups of df1_id + # note: for large populations looping through the groups (keys) of the + # large dataframe (assumed to be df1) is more efficient than looping + # over keys and subsetting on a key in each iteration. + df1_remaining = df1.loc[df1["id"].isin(remaining_ids)] + chunk_size = 1000 + for i, rows_df1 in df1_remaining.groupby( + np.arange(len(df1_remaining)) // chunk_size + ): + rows_df2 = df2 + if show_progress: + # Print the iteration number and the number of keys in the dict + logger.info( + f"Matching remaining individuals, {i * chunk_size} out of: {len(remaining_ids)}" + ) + + # apply the matching + match = match_psm(rows_df1, rows_df2, matching_columns) + + # append the results to the main dict + matches.update(match) + + return matches diff --git a/src/acbm/preprocessing.py b/src/acbm/preprocessing.py index 7601659..6b0a0ab 100644 --- a/src/acbm/preprocessing.py +++ b/src/acbm/preprocessing.py @@ -70,8 +70,6 @@ def nts_filter_by_year( data: pandas DataFrame The NTS data to be filtered - psu: pandas DataFrame - The Primary Sampling Unit table in the NTS. It has the year years: list The chosen year(s) """ @@ -111,24 +109,49 @@ def nts_filter_by_region( # 1. Create a column in the PSU table with the region names # Dictionary of the regions in the NTS and how they are coded + # PSUGOR_B02ID but does not have values for 2021 and 2022 + # region_dict = { + # -10.0: "DEAD", + # -9.0: "DNA", + # -8.0: "NA", + # 1.0: "North East", + # 2.0: "North West", + # 3.0: "Yorkshire and the Humber", + # 4.0: "East Midlands", + # 5.0: "West Midlands", + # 6.0: "East of England", + # 7.0: "London", + # 8.0: "South East", + # 9.0: "South West", + # 10.0: "Wales", + # 11.0: "Scotland", + # } + + # PSUStatsReg_B01ID but does not have values for 2021 and 2022 region_dict = { -10.0: "DEAD", -9.0: "DNA", -8.0: "NA", - 1.0: "North East", - 2.0: "North West", - 3.0: "Yorkshire and the Humber", - 4.0: "East Midlands", - 5.0: "West Midlands", - 6.0: "East of England", - 7.0: "London", - 8.0: "South East", + 1.0: "Northern, Metropolitan", + 2.0: "Northern, Non-metropolitan", + 3.0: "Yorkshire / Humberside, Metropolitan", + 4.0: "Yorkshire / Humberside, Non-metropolitan", + 5.0: "East Midlands", + 6.0: "East Anglia", + 7.0: "South East (excluding London Boroughs)", + 8.0: "London Boroughs", 9.0: "South West", - 10.0: "Wales", - 11.0: "Scotland", + 10.0: "West Midlands, Metropolitan", + 11.0: "West Midlands, Non-metropolitan", + 12.0: "North West, Metropolitan", + 13.0: "North West, Non-metropolitan", + 14.0: "Wales", + 15.0: "Scotland", } + # In the PSU table, create a column with the region names - psu["region_name"] = psu["PSUGOR_B02ID"].map(region_dict) + # psu["region_name"] = psu["PSUGOR_B02ID"].map(region_dict) + psu["region_name"] = psu["PSUStatsReg_B01ID"].map(region_dict) # 2. Check that all values of 'years' exist in the 'SurveyYear' column of 'psu' diff --git a/src/acbm/utils.py b/src/acbm/utils.py index 41fbed7..5703840 100644 --- a/src/acbm/utils.py +++ b/src/acbm/utils.py @@ -1,5 +1,6 @@ import numpy as np import pandas as pd +import polars as pl from sklearn.metrics import mean_squared_error from acbm.config import Config @@ -39,3 +40,65 @@ def get_travel_times(config: Config, use_estimates: bool = False) -> pd.DataFram if config.parameters.travel_times and not use_estimates: return pd.read_parquet(config.travel_times_filepath) return pd.read_parquet(config.travel_times_estimates_filepath) + + +def households_with_common_travel_days( + nts_trips: pd.DataFrame, days: list[int], hid="HouseholdID", pid="IndividualID" +) -> list[int]: + return ( + nts_trips.groupby([hid, pid])["TravDay"] + .apply(list) + .map(set) + .to_frame() + .groupby([hid])["TravDay"] + .apply( + lambda sets_of_days: set.intersection(*sets_of_days) + if set.intersection(*sets_of_days) + else None + ) + .to_frame()["TravDay"] + .apply( + lambda common_days: [day for day in common_days if day in days] + if common_days is not None + and common_days != {pd.NA} + and common_days != {np.nan} + else [] + ) + .apply(lambda common_days: common_days if common_days else pd.NA) + .dropna() + .reset_index()[hid] + .to_list() + ) + + +def households_with_travel_days_in_nts_weeks( + nts_trips: pd.DataFrame, days: list[int], hid="HouseholdID", pid="IndividualID" +) -> list[int]: + return ( + pl.DataFrame(nts_trips) + .group_by([hid, pid]) + .agg(pl.col("TravDay").unique()) + .select( + [ + hid, + pid, + pl.col("TravDay").list.drop_nulls().list.set_intersection(pl.lit(days)), + ] + ) + .select( + [ + hid, + pid, + pl.when(pl.col("TravDay").list.len().eq(0)) + .then(None) + .otherwise(pl.col("TravDay")) + .alias("TravDay"), + ] + ) + .group_by(hid) + .agg(pl.col("TravDay").list.len().ne(0).all()) + .filter(pl.col("TravDay").eq(True)) + .get_column(hid) + .sort() + .to_list() + ) diff --git a/src/acbm/validating/utils.py b/src/acbm/validating/utils.py index 3ea2b40..dfc999f 100644 --- a/src/acbm/validating/utils.py +++ b/src/acbm/validating/utils.py @@ -2,6 +2,8 @@ import pandas as pd from shapely import wkt +from acbm.assigning.utils import _adjust_distance + def process_sequences( df: pd.DataFrame, @@ -76,6 +78,8 @@ def calculate_od_distances( end_wkt_col: str, crs_epsg: int, projected_epsg: int = 3857, + detour_factor: float = 1.56, + decay_rate: float = 0.0001, ) -> pd.DataFrame: """ Calculate distances between start and end geometries in a DataFrame. @@ -94,6 +98,11 @@ def calculate_od_distances( projected_epsg: int EPSG code for the projected CRS (default is 3857). We need a metric crs to calculte distances in meters. + detour_factor: float + Factor to adjust the estimated distance. + decay_rate: float + Decay rate for the distance adjustment. Detours are a smaller proportion of + the direct distance for longer trips. Returns ------- @@ -120,7 +129,17 @@ def calculate_od_distances( gdf = gdf.to_crs(epsg=projected_epsg) end_gdf = end_gdf.to_crs(epsg=projected_epsg) - # Calculate the distance between start and end geometries (in km) - gdf["distance"] = round(gdf.geometry.distance(end_gdf.geometry) / 1000, 1) + # Calculate the distance between start and end geometries (in m) + gdf["distance"] = gdf.geometry.distance(end_gdf.geometry) + + # Estimate actual travel distance + gdf["distance"] = gdf["distance"].apply( + lambda d: _adjust_distance( + d, detour_factor=detour_factor, decay_rate=decay_rate + ) + ) + + # convert distance to km + gdf["distance"] = round(gdf["distance"] / 1000, 1) return gdf diff --git a/tests/test_config.py b/tests/test_config.py index 2bf69d6..7f12069 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -11,4 +11,4 @@ def config(): def test_id(config): - assert config.id == "a89b65de35" + assert config.id == "21e42c9d68" diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..5da7ef9 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,79 @@ +import pandas as pd +import pytest + +from acbm.utils import ( + households_with_common_travel_days, + households_with_travel_days_in_nts_weeks, +) + + +@pytest.fixture +def nts_trips(): + return pd.DataFrame.from_dict( + { + "IndividualID": [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + ], + "HouseholdID": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 7, 7], + "TravDay": [ + 1, + 1, + 1, + 2, + 3, + 2, + 3, + 3, + 3, + 3, + pd.NA, + pd.NA, + pd.NA, + pd.NA, + pd.NA, + 4, + 4, + 5, + 5, + pd.NA, + ], + } + ) + + +def test_households_with_common_travel_days(nts_trips): + assert households_with_common_travel_days(nts_trips, [1]) == [1] + assert households_with_common_travel_days(nts_trips, [1, 2]) == [1] + assert households_with_common_travel_days(nts_trips, [1, 3]) == [1, 3] + assert households_with_common_travel_days(nts_trips, [1, 3, 4]) == [1, 3] + + +def test_households_with_travel_days_in_nts_weeks(nts_trips): + assert households_with_travel_days_in_nts_weeks(nts_trips, [1]) == [1] + assert households_with_travel_days_in_nts_weeks(nts_trips, [1, 2]) == [1] + assert households_with_travel_days_in_nts_weeks(nts_trips, [1, 3]) == [1, 3] + assert households_with_travel_days_in_nts_weeks(nts_trips, [1, 3, 4]) == [1, 3] + assert households_with_travel_days_in_nts_weeks(nts_trips, [1, 3, 4, 5]) == [ + 1, + 3, + 6, + ]