Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions pypesto/objective/amici/amici.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import TYPE_CHECKING, Union

import numpy as np
import pandas as pd

from ...C import (
FVAL,
Expand All @@ -36,6 +37,8 @@
)

if TYPE_CHECKING:
import amici.importers.petab

from ...hierarchical import InnerCalculatorCollector

try:
Expand Down Expand Up @@ -718,3 +721,67 @@ def update_from_problem(
) in condition_mapping.map_preeq_fix.items():
if (val := id_to_val.get(mapped_to_par)) is not None:
condition_mapping.map_preeq_fix[model_par] = val


class AmiciPetabV2Objective(AmiciObjective):
"""An AMICI objective constructed from a PEtab v2 problem."""

def __init__(
self,
petab_importer: amici.importers.petab.PetabImporter,
**kwargs,
) -> None:
from .amici_calculator import AmiciCalculatorPetabV2

self._petab_simulator: amici.petab.petab_importer.PetabSimulator = (
petab_importer.create_simulator()
)
self.petab_problem = petab_importer.petab_problem
amici_model = self._petab_simulator.model
amici_solver = self._petab_simulator.solver
edatas = self._petab_simulator.exp_man.create_edatas()

super().__init__(
amici_model=amici_model,
amici_solver=amici_solver,
edatas=edatas,
calculator=AmiciCalculatorPetabV2(self._petab_simulator),
**kwargs,
)

def __deepcopy__(self, memo=None):
"""Override AmiciObjective.__deepcopy__."""
if memo is None:
memo = {}
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
setattr(result, k, copy.deepcopy(v, memo))
return result

def __getstate__(self) -> dict:
"""Use Python's default pickling semantics (shallow copy of instance dict)."""
return dict(self.__dict__)

def __setstate__(self, state: dict) -> None:
"""Restore state using the instance dict (default unpickling behaviour)."""
self.__dict__.update(state)

def rdatas_to_simulation_df(
self,
rdatas: Sequence[amici.ReturnData],
) -> pd.DataFrame:
"""
See :meth:`rdatas_to_measurement_df`.

Except a petab simulation dataframe is created, i.e. the measurement
column label is adjusted.
"""
from amici.importers.petab import rdatas_to_simulation_df

return rdatas_to_simulation_df(
rdatas,
self._petab_simulator._model,
self._petab_simulator._petab_problem,
)
157 changes: 157 additions & 0 deletions pypesto/objective/amici/amici_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,163 @@ def __call__(
)


class AmiciCalculatorPetabV2(AmiciCalculator):
"""Class to perform the AMICI call and obtain objective function values."""

def __init__(
self,
petab_simulator: amici.petab.petab_importer.PetabSimulator,
**kwargs,
):
super().__init__(**kwargs)
self.petab_simulator = petab_simulator

def __call__(
self,
x_dct: dict,
sensi_orders: tuple[int],
mode: ModeType,
amici_model: AmiciModel,
amici_solver: AmiciSolver,
edatas: list[amici.ExpData],
n_threads: int,
x_ids: Sequence[str],
parameter_mapping: ParameterMapping,
fim_for_hess: bool,
):
"""Perform the actual AMICI call.

Called within the :func:`AmiciObjective.__call__` method.

Parameters
----------
x_dct:
Parameters for which to compute function value and derivatives.
sensi_orders:
Tuple of requested sensitivity orders.
mode:
Call mode (function value or residual based).
amici_model:
The AMICI model.
amici_solver:
The AMICI solver.
edatas:
The experimental data.
n_threads:
Number of threads for AMICI call.
x_ids:
Ids of optimization parameters.
parameter_mapping:
Mapping of optimization to simulation parameters.
fim_for_hess:
Whether to use the FIM (if available) instead of the Hessian (if
requested).
"""
amici_solver = self.petab_simulator._solver

if mode != MODE_FUN:
raise NotImplementedError(
"Only function value mode is currently supported for "
f"PEtab v2. Got mode {mode}."
)

# TODO: -> method
# set order in solver
sensi_order = 0
if sensi_orders:
sensi_order = max(sensi_orders)

if sensi_order == 2 and fim_for_hess:
# we use the FIM
amici_solver.set_sensitivity_order(sensi_order - 1)
else:
amici_solver.set_sensitivity_order(sensi_order)

dim = len(x_ids)

# run amici simulation
result = self.petab_simulator.simulate(x_dct)
rdatas = result.rdatas

# check if the simulation failed
if any(rdata["status"] < 0.0 for rdata in rdatas):
return get_error_output(
amici_model, edatas, rdatas, sensi_orders, mode, dim
)

nllh, snllh, s2nllh, chi2, res, sres = init_return_values(
sensi_orders, mode, dim
)
nllh = -result.llh

if (
not self._known_least_squares_safe
and mode == MODE_RES
and 1 in sensi_orders
):
if not amici_model.get_add_sigma_residuals() and any(
(
(r["ssigmay"] is not None and np.any(r["ssigmay"]))
or (r["ssigmaz"] is not None and np.any(r["ssigmaz"]))
)
for r in rdatas
):
raise RuntimeError(
"Cannot use least squares solver with"
"parameter dependent sigma! Support can be "
"enabled via "
"amici_model.setAddSigmaResiduals()."
)
self._known_least_squares_safe = True # don't check this again

# TODO: compute res, sres

if 1 in sensi_orders:
if result.sllh is None and np.isnan(result["llh"]):
# TODO: to amici -- set sllh even if llh is nan?
snllh = np.full(len(x_ids), np.nan)
else:
try:
# llh to nllh, dict to array
snllh = -np.array(
[
result.sllh[
x_id
] # if x_id in res["sllh"] else 0.0
for x_id in x_ids
if x_id in x_dct.keys()
]
)
except KeyError as e:
# A requested sensitivity is missing.
# Probably the affected parameter is a fixed parameter
# in amici instead of a sensitivity parameter
# (non_estimated_parameters_as_constants=True ?).
# In this case, only max(sensi_orders) == 0 is supported
# unless this parameter is fixed in the pypesto problem.
raise ValueError(
f"Cannot compute gradient, missing entry for "
f"{set(result.sllh) - set(x_dct.keys())}."
) from e
if 2 in sensi_orders:
if result.s2llh is None and np.isnan(result.llh):
# TODO: to amici -- set s2llh even if llh is nan?
s2nllh = np.full((len(x_ids), len(x_ids)), np.nan)
else:
s2nllh = -result.s2llh

ret = {
FVAL: nllh,
GRAD: snllh,
HESS: s2nllh,
RES: res,
SRES: sres,
RDATAS: rdatas,
}

return filter_return_dict(ret)


def calculate_function_values(
rdatas,
sensi_orders: tuple[int, ...],
Expand Down
Loading
Loading