Skip to content

Commit

Permalink
PEtab: Fix initial state from condition table
Browse files Browse the repository at this point in the history
  • Loading branch information
dweindl committed Nov 8, 2024
1 parent 1d793ea commit fa52936
Showing 1 changed file with 45 additions and 175 deletions.
220 changes: 45 additions & 175 deletions python/parpe/hdf5_pe_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import pandas as pd
from amici.petab import PREEQ_INDICATOR_ID
from amici.petab.import_helpers import petab_scale_to_amici_scale
from amici.petab.parameter_mapping import create_parameter_mapping
from amici.petab.util import get_states_in_condition_table
from colorama import Fore
from colorama import init as init_colorama
from pandas import DataFrame
Expand Down Expand Up @@ -271,15 +273,27 @@ def _generate_simulation_to_optimization_parameter_mapping(self) -> None:
"""
Create dataset n_parameters_simulation x n_conditions with indices of
respective parameters in parameters_optimization
Creates the following datasets:
* ``/parameters/pscaleSimulation``
* ``/parameters/pscaleOptimization``
* ``/fixedParameters/reinitializationIndices``
* ``/parameters/parameterOverrides``
* ``/parameters/optimizationSimulationMapping``
"""

# get list of tuple of parameters dicts for all conditions
self.parameter_mapping = self.petab_problem \
.get_optimization_to_simulation_parameter_mapping(
warn_unmapped=False, scaled_parameters=False,
self.parameter_mapping = create_parameter_mapping(
petab_problem=self.petab_problem,
amici_model=self.amici_model,
simulation_conditions=self.simulation_conditions,
scaled_parameters=False,
warn_unmapped=False,
fill_fixed_parameters=True,
allow_timepoint_specific_numeric_noise_parameters=True)
allow_timepoint_specific_numeric_noise_parameters=True
)

state_ids = self.amici_model.getStateIds()
variable_par_ids = self.amici_model.getParameterIds()
fixed_par_ids = self.amici_model.getFixedParameterIds()

Expand Down Expand Up @@ -311,22 +325,13 @@ def _generate_simulation_to_optimization_parameter_mapping(self) -> None:
shape=(self.nk, self.num_condition_vectors),
fill_value=np.nan)

# For handling initial states
species_in_condition_table = [
col for col in self.petab_problem.condition_df
if self.petab_problem.sbml_model.getSpecies(col) is not None]

# for reinitialization
state_id_to_idx = {
id: i for i, id in enumerate(self.amici_model.getStateIds())}
state_idxs_reinitialization_all = []

# Merge and preeq and sim parameters, filter fixed parameters
for condition_idx, \
(condition_map_preeq, condition_map_sim,
condition_scale_map_preeq, condition_scale_map_sim) \
in enumerate(self.parameter_mapping):

for condition_idx, ((_, condition), cond_par_map) \
in enumerate(zip(self.simulation_conditions.iterrows(), self.parameter_mapping, strict=True)):
preeq_cond_idx, sim_cond_idx = self.condition_map[condition_idx]
preeq_cond_id = self.condition_ids[preeq_cond_idx] \
if preeq_cond_idx != NO_PREEQ_CONDITION_IDX else None
Expand All @@ -336,160 +341,24 @@ def _generate_simulation_to_optimization_parameter_mapping(self) -> None:
f"sim_cond_idx: {sim_cond_idx}, "
f"sim_cond_id: {sim_cond_id}")

if len(condition_map_preeq) != len(condition_scale_map_preeq) \
or len(condition_map_sim) != len(condition_scale_map_sim):
raise AssertionError(
"Number of parameters and number of parameter "
"scales do not match.")
if len(condition_map_preeq) \
and len(condition_map_preeq) != len(condition_map_sim):
# logger.debug(
# f"Preequilibration parameter map: {condition_map_preeq}")
# logger.debug(f"Simulation parameter map: {condition_map_sim}")
raise AssertionError(
"Number of parameters for preequilibration "
"and simulation do not match.")

# state indices for reinitialization
# for current simulation condition
state_idxs_for_reinitialization_cur = []

# TODO: requires special handling of initial concentrations
if species_in_condition_table:
# set indicator fixed parameter for preeq
# (we expect here, that this parameter was added during AMICI
# model import and that it was not added by the user with a
# different meaning...)
if preeq_cond_idx != NO_PREEQ_CONDITION_IDX:
condition_map_preeq[PREEQ_INDICATOR_ID] = 1.0
condition_scale_map_preeq[PREEQ_INDICATOR_ID] = ptc.LIN

condition_map_sim[PREEQ_INDICATOR_ID] = 0.0
condition_scale_map_sim[PREEQ_INDICATOR_ID] = ptc.LIN

def _set_initial_concentration(condition_id, species_id,
init_par_id,
par_map, scale_map):
value = petab.to_float_if_float(
self.petab_problem.condition_df.loc[
condition_id, species_id])

if isinstance(value, Number):
# numeric initial state
par_map[init_par_id] = value
scale_map[init_par_id] = ptc.LIN
else:
# parametric initial state
try:
# try find in mapping
par_map[init_par_id] = par_map[value]
scale_map[init_par_id] = scale_map[value]
except KeyError:
# otherwise look up in parameter table
if (self.petab_problem.parameter_df.loc[
value, ptc.ESTIMATE] == 0):
par_map[init_par_id] = \
self.petab_problem.parameter_df.loc[
value, ptc.NOMINAL_VALUE]
else:
par_map[init_par_id] = value

if (ptc.PARAMETER_SCALE
not in self.petab_problem.parameter_df
or not self.petab_problem.parameter_df.loc[
value, ptc.PARAMETER_SCALE]):
scale_map[init_par_id] = ptc.LIN
else:
scale_map[init_par_id] = \
self.petab_problem.parameter_df.loc[
value, ptc.PARAMETER_SCALE]

for species_id in species_in_condition_table:
# for preequilibration
init_par_id = f'initial_{species_id}_preeq'

# preeq initial parameter is always added during PEtab
# import, independently of whether preeq is used. Need to
# set dummy value for preeq parameter anyways, as
# it is expected in parameter mapping below
# (set to 0, not nan, because will be multiplied with
# indicator variable in initial assignment)
condition_map_sim[init_par_id] = 0.0
condition_scale_map_sim[init_par_id] = ptc.LIN

if preeq_cond_idx != NO_PREEQ_CONDITION_IDX:
_set_initial_concentration(
preeq_cond_id, species_id, init_par_id,
condition_map_preeq,
condition_scale_map_preeq)

# Check if reinitialization is requested
value_sim = petab.to_float_if_float(
self.petab_problem.condition_df.loc[
sim_cond_id, species_id])

if not isinstance(value_sim, Number) \
or not np.isnan(value_sim):
# mark for reinitialization,
# unless the value is nan
species_idx = state_id_to_idx[species_id]
state_idxs_for_reinitialization_cur.append(
species_idx)

# Set the preequilibration value also for simulation.
# Either it will be overwritten in the next step,
# or it will not be used anywhere.
# Setting it to the same value as for
# preequilibration avoids issues with AMICI,
# where we cannot provide different values for
# dynamic parameter for preequilibration and
# simulation.
condition_map_sim[init_par_id] = \
condition_map_preeq[init_par_id]
condition_scale_map_sim[init_par_id] = \
condition_scale_map_preeq[init_par_id]

# for simulation
init_par_id = f'initial_{species_id}_sim'
_set_initial_concentration(
sim_cond_id, species_id, init_par_id,
condition_map_sim,
condition_scale_map_sim)

state_idxs_reinitialization_all.append(state_idxs_for_reinitialization_cur)
logger.debug(f"condition_map_preeq: {condition_map_preeq}, "
f"condition_map_sim: {condition_map_sim}")

# split into fixed and variable parameters:
condition_map_preeq_var = condition_scale_map_preeq_var = None
condition_map_preeq_fix = None
if condition_map_preeq:
condition_map_preeq_var, condition_map_preeq_fix = \
_subset_dict(condition_map_preeq, variable_par_ids,
fixed_par_ids)
condition_scale_map_preeq_var, _ = \
_subset_dict(condition_scale_map_preeq, variable_par_ids,
fixed_par_ids)

condition_map_sim_var, condition_map_sim_fix = \
_subset_dict(condition_map_sim, variable_par_ids, fixed_par_ids)
condition_scale_map_sim_var, condition_scale_map_sim_fix = \
_subset_dict(condition_scale_map_sim, variable_par_ids,
fixed_par_ids)

if condition_map_preeq:
# merge after having removed potentially fixed parameters
# which may differ between preequilibration and simulation
petab.merge_preeq_and_sim_pars_condition(
condition_map_preeq_var, condition_map_sim_var,
condition_scale_map_preeq_var, condition_scale_map_sim_var,
condition_idx)

# mapping for each model parameter
# state indices for reinitialization after pre-equilibration
states_in_condition_table = get_states_in_condition_table(
self.petab_problem, condition=condition
)
state_idx_reinitalization_cur = [
state_ids.index(s)
for s, (v, v_preeq) in states_in_condition_table.items()
if not np.isnan(v)
]
state_idxs_reinitialization_all.append(state_idx_reinitalization_cur)

logger.debug(cond_par_map)

# update mapping matrix
for model_parameter_idx, model_parameter_id \
in enumerate(variable_par_ids):
mapped_parameter = condition_map_sim[model_parameter_id]
mapped_parameter = to_float_if_float(mapped_parameter)
mapped_parameter = cond_par_map.map_sim_var[model_parameter_id]
# mapped_parameter = to_float_if_float(mapped_parameter)
try:
(mapped_idx, override) = self.get_index_mapping_for_par(
mapped_parameter, optimization_parameter_name_to_index)
Expand All @@ -507,22 +376,22 @@ def _set_initial_concentration(condition_id, species_id,

# Set parameter scales for simulation
pscale_matrix[:, condition_idx] = list(
petab_scale_to_amici_scale(condition_scale_map_sim[amici_id])
petab_scale_to_amici_scale(cond_par_map.scale_map_sim_var[amici_id])
for amici_id in variable_par_ids)

fixed_parameter_matrix[:, self.condition_map[condition_idx, 1]] = \
np.array([condition_map_sim_fix[par_id]
np.array([cond_par_map.map_sim_fix[par_id]
for par_id in fixed_par_ids])

if condition_map_preeq:
if cond_par_map.map_preeq_fix:
fixed_parameter_matrix[:,
self.condition_map[condition_idx, 0]] = (
# the parameter might not be present if it is only used
# in the simulation condition. any value should be fine
# in this case. only NaN is problematic, because it will
# propagate through multiplication with the indicator
# variable.
np.array([condition_map_preeq_fix.get(par_id, 0.0)
np.array([cond_par_map.map_preeq_fix.get(par_id, 0.0)
for par_id in fixed_par_ids])
)

Expand Down Expand Up @@ -556,7 +425,7 @@ def _set_initial_concentration(condition_id, species_id,
dtype=data_type)
for i, state_idxs in enumerate(state_idxs_reinitialization_all):
dset[i] = state_idxs
logger.debug(f"Reinitialization [{i}]: {dset[i]}")
logger.debug(f"Reinitialization [{i}]: {dset[i]} {[state_ids[j] for j in dset[i]]}")

self.f.flush()

Expand Down Expand Up @@ -626,8 +495,9 @@ def _generate_simulation_condition_map(self):
respective condition index.
"""
# PEtab condition list
simulations = \
self.petab_problem.get_simulation_conditions_from_measurement_df()
self.simulation_conditions = self.petab_problem.get_simulation_conditions_from_measurement_df()
simulations = self.simulation_conditions

# empty dataset
condition_map = np.full(shape=(len(simulations), 2),
fill_value=self.NO_PREEQ_CONDITION_IDX)
Expand Down

0 comments on commit fa52936

Please sign in to comment.