Source code for petab_select.candidate_space

"""Classes and methods related to candidate spaces."""

import abc
import bisect
import copy
import csv
import logging
import warnings
from collections.abc import Callable
from pathlib import Path
from typing import Any

import numpy as np
from more_itertools import one

from .constants import (
    ESTIMATE,
    METHOD_SCHEME,
    NEXT_METHOD,
    PREDECESSOR_MODEL,
    PREVIOUS_METHODS,
    TYPE_PATH,
    VIRTUAL_INITIAL_MODEL_METHODS,
    Criterion,
    Method,
)
from .handlers import TYPE_LIMIT, LimitHandler
from .model import (
    VIRTUAL_INITIAL_MODEL,
    VIRTUAL_INITIAL_MODEL_HASH,
    Model,
    ModelHash,
    default_compare,
)
from .models import Models
from .petab import get_petab_parameters

__all__ = [
    "BackwardCandidateSpace",
    "BruteForceCandidateSpace",
    "CandidateSpace",
    "FamosCandidateSpace",
    "ForwardCandidateSpace",
    "LateralCandidateSpace",
]


[docs] class CandidateSpace(abc.ABC): """A base class for collecting candidate models. The intended use of subclasses is to identify suitable models in a model space, that will be provided to a model selection method for selection. Attributes: criterion: The criterion by which models are compared. distances: The distances of all candidate models from the initial model. predecessor_model: The model used for comparison, e.g. for stepwise methods. previous_predecessor_model: The previous predecessor model. excluded_hashes: A list of model hashes that will not be accepted into the candidate space. The hashes of accepted models are added to :attr:``excluded_hashes``. governing_method: Used to store the search method that governs the choice of method during a search. In some cases, this is always the same as the method attribute. An example of a difference is in the bidirectional method, where ``governing_method`` stores the bidirectional method, whereas `method` may also store the forward or backward methods. iteration: The iteration of model selection. limit: A handler to limit the number of accepted models. models: The current set of candidate models. method: The model selection method of the candidate space. predecessor_model: The model used for comparison, e.g. for stepwise methods. previous_predecessor_model: The previous predecessor model. retry_model_space_search_if_no_models: Whether a search with a candidate space should be repeated upon failure. Useful for the :class:`BidirectionalCandidateSpace`, which switches directions upon failure. summary_tsv: A string or :class:`pathlib.Path`. A summary of the model selection progress will be written to this file. calibrated_models: All models that have been calibrated across all iterations of model selection. latest_iteration_calibrated_models: The calibrated models from the most recent iteration. """ """ FIXME(dilpath) - rename previous_predecessor_model to latest_predecessor_model - distances: change list to int? Is storage of more than one value useful? """
[docs] def __init__( self, method: Method, criterion: Criterion, # TODO add MODEL_TYPE = Union[str, Model], str for VIRTUAL_INITIAL_MODEL predecessor_model: Model | None = None, excluded_hashes: list[ModelHash] | None = None, limit: TYPE_LIMIT = np.inf, summary_tsv: TYPE_PATH = None, previous_predecessor_model: Model | None = None, calibrated_models: Models | None = None, iteration: int = 0, ): """See class attributes for arguments.""" self.method = method self.limit = LimitHandler( current=self.n_accepted, limit=limit, ) self.reset( predecessor_model=predecessor_model, excluded_hashes=excluded_hashes, ) self.summary_tsv = summary_tsv if self.summary_tsv is not None: self.summary_tsv = Path(self.summary_tsv) self._setup_summary_tsv() self.previous_predecessor_model = previous_predecessor_model if self.previous_predecessor_model is None: self.previous_predecessor_model = self.predecessor_model self.set_iteration_user_calibrated_models(Models()) self.criterion = criterion self.calibrated_models = calibrated_models or Models() self.latest_iteration_calibrated_models = Models() self.iteration = iteration
[docs] def set_iteration_user_calibrated_models( self, user_calibrated_models: Models | None ) -> None: """Hide previously-calibrated models from the calibration tool. This allows previously-calibrated model results, e.g. from a previous model selection job, to be re-used in this job. Calibrated models are stored here between model selection iterations, while the calibration tool calibrates the uncalibrated models of the iteration. The models are then combined as the full model calibration results for the iteration, with :func:`get_iteration_calibrated_models`. Args: user_calibrated_models: The previously-calibrated models. """ if not user_calibrated_models: self.iteration_user_calibrated_models = Models() return iteration_uncalibrated_models = Models() iteration_user_calibrated_models = Models() for model in self.models: if ( (user_model := user_calibrated_models.get(model.hash, None)) is not None ) and ( user_model.get_criterion( self.criterion, raise_on_failure=False ) is not None ): logging.info(f"Using user-supplied result for: {model.hash}") user_model_copy = copy.deepcopy(user_model) user_model_copy.predecessor_model_hash = ( self.predecessor_model.hash ) iteration_user_calibrated_models[user_model_copy.hash] = ( user_model_copy ) else: iteration_uncalibrated_models.append(model) self.iteration_user_calibrated_models = ( iteration_user_calibrated_models ) self.models = iteration_uncalibrated_models
[docs] def get_iteration_calibrated_models( self, calibrated_models: Models, reset: bool = False, ) -> Models: """Get all calibrated models for the current iteration. The full list of models identified for calibration in an iteration of model selection may include models for which calibration results are already available. This combines the calibration results of the uncalibrated models, with the models that were already calibrated, to produce the full list of models that were identified for calibration in the current iteration. Args: calibrated_models: The calibration results for the uncalibrated models of this iteration. Keys are model hashes, values are models. reset: Whether to remove the previously calibrated models from the candidate space, after they are used to produce the full list of calibrated models. iteration: If provided, the iteration attribute of each model will be set to this. Returns: All calibrated models for the current iteration. """ combined_calibrated_models = ( self.iteration_user_calibrated_models + calibrated_models ) if reset: self.set_iteration_user_calibrated_models( user_calibrated_models=Models() ) for model in combined_calibrated_models: model.iteration = self.iteration return combined_calibrated_models
[docs] def write_summary_tsv(self, row: list[Any]): """Write the summary of the last iteration to a TSV file. The destination is defined in ``self.summary_tsv``. Args: row: The row that will be written to disk. """ if self.summary_tsv is None: return # Format single values to be valid rows if not isinstance(row, list): row = [ row, *([""] * 5), ] with open(self.summary_tsv, "a", encoding="utf-8") as f: writer = csv.writer(f, delimiter="\t") writer.writerow(row)
def _setup_summary_tsv(self): """Setup the summary TSV file columns.""" self.summary_tsv.resolve().parent.mkdir(parents=True, exist_ok=True) if not self.summary_tsv.exists(): self.write_summary_tsv( [ "method", "# candidates", "predecessor change", "current model criterion", "current model", "candidate changes", ] )
[docs] @classmethod def read_arguments_from_yaml_dict( cls, yaml_dict: dict[str, str], ) -> dict[str, str | Model]: """Parse settings that were stored in YAML. Args: yaml_dict: The information that was read from the YAML file. Keys are class attributes, values are the corresponding values. Returns: The settings, parsed into PEtab Select objects where possible. """ kwargs = copy.deepcopy(yaml_dict) predecessor_model = None if ( predecessor_model_yaml := kwargs.pop(PREDECESSOR_MODEL, None) ) is not None: predecessor_model = Model.from_yaml(predecessor_model_yaml) return { **kwargs, PREDECESSOR_MODEL: predecessor_model, }
[docs] def is_plausible(self, model: Model) -> bool: """Determine whether a candidate model is plausible. A plausible model is one that could possibly be chosen by model selection method, in the absence of information about other models in the model space. For example, given a forward selection method that starts with an initial model ``self.predecessor_model`` that has no estimated parameters, then only models with one or more estimated parameters are plausible. Args: model: The candidate model. Returns: ``True`` if ``model`` is plausible, else ``False``. """ return True
[docs] def distance(self, model: Model) -> None | float | int: """Compute the distance between two models that are neighbors. Args: model: The candidate model. Returns: The distance from ``self.predecessor_model`` to ``model``, or ``None`` if the distance should not be computed. """ return None
[docs] def accept( self, model: Model, distance: None | float | int, ) -> None: """Add a candidate model to the candidate space. Args: model: The model that will be added. distance: The distance of the model from the predecessor model. """ model.predecessor_model_hash = self.predecessor_model.hash self.models.append(model) self.distances.append(distance) self.set_excluded_hashes(model, extend=True)
[docs] def n_accepted(self) -> int: """Get the current number of accepted models. Returns: The number of models. """ return len(self.models)
[docs] def excluded( self, model_hash: Model | ModelHash, ) -> bool: """Check whether a model is excluded. Args: model: The model. Returns: ``True`` if the ``model`` is excluded, otherwise ``False``. """ if isinstance(model_hash, Model): model_hash = model_hash.hash return model_hash in self.get_excluded_hashes()
@abc.abstractmethod def _consider_method(self, model: Model) -> bool: """Consider whether a model should be accepted, according to a method. Args: model: The candidate model. Returns: Whether a model should be accepted. """ return True
[docs] def consider(self, model: Model | None) -> bool: """Add a candidate model, if it should be added. Args: model: The candidate model. This value may be ``None`` if the :class:`ModelSubspace` decided to exclude the model that would have been sent. Returns: Whether it is OK to send additional models to the candidate space. For example, if the limit of the number of accepted models has been reached, then no further models should be sent. """ # FIXME(dilpath) # TODO change to return whether the model was accepted, and instead add # `self.continue` to determine whether additional models should be sent. # Model was excluded by the `ModelSubspace` that called this method, so can be # skipped. if model is None: # TODO use a different code than `True`? return True if self.limit.reached(): return False if self.excluded(model): warnings.warn( f"Model `{model.hash}` has been previously excluded " "from the candidate space so is skipped here.", RuntimeWarning, stacklevel=2, ) return True if not self.is_plausible(model): return True if not self._consider_method(model): return True self.accept(model, distance=self.distance(model)) return not self.limit.reached()
[docs] def reset_accepted(self) -> None: """Reset the accepted models.""" self.models = Models() self.distances = []
[docs] def set_predecessor_model(self, predecessor_model: Model | None): """Set the predecessor model. See class attributes for arguments. """ if predecessor_model is None: predecessor_model = VIRTUAL_INITIAL_MODEL self.predecessor_model = predecessor_model
[docs] def get_predecessor_model(self) -> str | Model: """Get the predecessor model.""" return self.predecessor_model
[docs] def set_excluded_hashes( self, hashes: Model | ModelHash | list[Model | ModelHash], extend: bool = False, ) -> None: """Set the excluded hashes. Args: hashes: The model hashes that will be excluded. extend: Whether to replace or extend the current excluded hashes. """ # FIXME refactor to use `Models` and rename `set_excluded_models`? if isinstance(hashes, Model | ModelHash): hashes = [hashes] excluded_hashes = set() for potential_hash in hashes: if isinstance(potential_hash, Model): potential_hash = potential_hash.hash excluded_hashes.add(potential_hash) if extend: self.excluded_hashes = self.get_excluded_hashes() | excluded_hashes else: self.excluded_hashes = set(excluded_hashes)
[docs] def get_excluded_hashes(self) -> set[ModelHash]: """Get the excluded hashes. Returns: The hashes of excluded models. """ try: return self.excluded_hashes except AttributeError: self.excluded_hashes = set() return self.get_excluded_hashes()
[docs] def set_limit(self, limit: TYPE_LIMIT = None) -> None: """Set the limit on the number of accepted models. Args: limit: The limit. """ if limit is not None: self.limit.set_limit(limit)
[docs] def get_limit(self) -> TYPE_LIMIT: """Get the limit on the number of accepted models.""" return self.limit.get_limit()
[docs] def wrap_search_subspaces( self, search_subspaces: Callable[[], None], ) -> Callable: """Decorate the subspace searches of a model space. Used by candidate spaces to perform changes that alter the search. See :class:`BidirectionalCandidateSpace` for an example, where it's used to switch directions. Args: search_subspaces: The method that searches the subspaces of a model space. Returns: The wrapped ``search_subspaces``. """ def wrapper(): search_subspaces() return wrapper
[docs] def reset( self, predecessor_model: Model | None = None, # FIXME change `Any` to some `TYPE_MODEL_HASH` (e.g. union of str/int/float) excluded_hashes: list[ModelHash] | None = None, limit: TYPE_LIMIT = None, ) -> None: """Reset the candidate models, optionally reinitialize with a model. Args: predecessor_model: The initial model. excluded_hashes: These hashes will extend the current excluded hashes. limit: The new upper limit of the number of models in this candidate space. Defaults to the previous limit. """ if excluded_hashes is None: excluded_hashes = self.get_excluded_hashes() if limit is None: limit = self.limit.get_limit() self.set_predecessor_model(predecessor_model) self.reset_accepted() self.set_excluded_hashes( excluded_hashes, extend=False, # FIXME True? ) self.set_limit(limit)
[docs] def distances_in_estimated_parameters( self, model: Model, predecessor_model: Model | None = None, ) -> dict[str, float | int]: """Distance between two models in model space, using different metrics. All metrics are in terms of estimated parameters. Metrics: l1: The L_1 distance between two models. size: The difference in the number of estimated parameters between two models. Args: model: The candidate model. predecessor_model: See class attributes. Returns: The distances between the models, as a dictionary, where a key is the name of the metric, and the value is the corresponding distance. """ model0 = predecessor_model if model0 is None: model0 = self.predecessor_model model1 = model if ( model0.hash != VIRTUAL_INITIAL_MODEL_HASH and not model1.model_subspace_petab_yaml.samefile( model0.model_subspace_petab_yaml ) ): # FIXME raise NotImplementedError( "Computing distances between models that have different " "model subspace PEtab problems is currently not supported. " "This check is based on the PEtab YAML file location." ) # All parameters from the PEtab problem are used in the computation. if model0.hash == VIRTUAL_INITIAL_MODEL_HASH: parameter_ids = list( get_petab_parameters(model1._model_subspace_petab_problem) ) if self.method == Method.FORWARD: parameters0 = np.array([0 for _ in parameter_ids]) elif self.method == Method.BACKWARD: parameters0 = np.array([ESTIMATE for _ in parameter_ids]) else: raise NotImplementedError( "Distances for the virtual initial model have not yet been " f'implemented for the method "{self.method}". Please notify the' "developers." ) else: parameter_ids = list( get_petab_parameters(model0._model_subspace_petab_problem) ) parameters0 = np.array( model0.get_parameter_values(parameter_ids=parameter_ids) ) parameters1 = np.array( model1.get_parameter_values(parameter_ids=parameter_ids) ) # TODO change to some numpy elementwise operation estimated0 = np.array([p == ESTIMATE for p in parameters0]).astype(int) estimated1 = np.array([p == ESTIMATE for p in parameters1]).astype(int) difference = estimated1 - estimated0 # Changes to(/from) estimated from(/to) not estimated l1 = np.abs(difference).sum() # Change in the number of estimated parameters. size = np.sum(difference) # TODO constants? e.g. Distance.L1 and Distance.Size distances = { "l1": l1, "size": size, } return distances
[docs] def update_after_calibration( self, *args, iteration_calibrated_models: Models, **kwargs, ): """Do work in the candidate space after calibration. For example, this is used by the :class:`FamosCandidateSpace` to switch methods. Different candidate spaces require different arguments. All arguments are here, to ensure candidate spaces can be switched easily and still receive sufficient arguments. """ self.calibrated_models += iteration_calibrated_models self.latest_iteration_calibrated_models = iteration_calibrated_models self.set_excluded_hashes( self.latest_iteration_calibrated_models, extend=False, # FIXME True? )
[docs] class ForwardCandidateSpace(CandidateSpace): """The forward method class. Attributes: direction: ``1`` for the forward method, ``-1`` for the backward method. max_steps: Maximum number of steps forward in a single iteration of forward selection. Defaults to no maximum (``None``). """ direction = 1
[docs] def __init__( self, *args, predecessor_model: Model | str | None = None, max_steps: int = None, **kwargs, ): # Although `VIRTUAL_INITIAL_MODEL` is `str` and can be used as a default # argument, `None` may be passed by other packages, so the default value # is handled here instead. self.max_steps = max_steps if predecessor_model is None: predecessor_model = VIRTUAL_INITIAL_MODEL super().__init__( *args, method=Method.FORWARD if self.direction == 1 else Method.BACKWARD, predecessor_model=predecessor_model, **kwargs, )
[docs] def is_plausible(self, model: Model) -> bool: distances = self.distances_in_estimated_parameters(model) n_steps = self.direction * distances["size"] if self.max_steps is not None and n_steps > self.max_steps: raise StopIteration( f"Maximal number of steps for method {self.method} exceeded. Stop sending candidate models." ) # A model is plausible if the number of estimated parameters strictly # increases (or decreases, if `self.direction == -1`), and no # previously estimated parameters become fixed. if self.predecessor_model.hash == VIRTUAL_INITIAL_MODEL.hash or ( n_steps > 0 and distances["l1"] == n_steps ): return True return False
[docs] def distance(self, model: Model) -> int: # TODO calculated here and `is_plausible`. Rewrite to only calculate # once? distances = self.distances_in_estimated_parameters(model) return distances["l1"]
def _consider_method(self, model) -> bool: """See :meth:`CandidateSpace._consider_method`.""" distance = self.distance(model) # Get the distance of the current "best" plausible model(s) distance0 = np.inf if self.distances: distance0 = one(set(self.distances)) # TODO store each or just one? # distance0 = one(self.distances) # breakpoint() # Only keep the best model(s). if distance > distance0: return False if distance < distance0: self.reset_accepted() return True
[docs] class BackwardCandidateSpace(ForwardCandidateSpace): """The backward method class.""" direction = -1
def forward_super_and_inner( candidate_space: CandidateSpace, method_name: str ) -> None: """Decorator to call the method of both the `super()` and `inner` space. Useful in the :class:`FamosCandidateSpace`, where e.g. excluded hashes should be applied to both the governing `FamosCandidateSpace`, and the active `FamosCandidateSpace.inner_candidate_space`. If the method is an instance member, `candidate_space` will be used instead of `super()`. Args: candidate_space: A candidate space that contains an `inner_candidate_space`. method_name: The method of the `candidate_space` to decorate. Returns: A tuple with the output from the method called with 1. `super()` if possible else `candidate_space`, and 2. `candidate_space.inner_candidate_space`. """ # TODO check if docs look OK for wrapped methods; try functools.wraps # @wraps(getattr(candidate_space, method_name)) def wrapped_method(*args, **kwargs): try: super_object = super(type(candidate_space), candidate_space) getattr(super_object, method_name) except AttributeError: super_object = candidate_space inner_object = candidate_space.inner_candidate_space return ( getattr(super_object, method_name)(*args, **kwargs), getattr(inner_object, method_name)(*args, **kwargs), ) return wrapped_method def forward_inner(candidate_space: CandidateSpace, method_name: str) -> None: """Decorator to call the method of the `inner` space. See :func:`super_and_inner` for more details. Returns: The output from the method called with `candidate_space.inner_candidate_space`. """ # TODO check if docs look OK for wrapped methods; try functools.wraps # @wraps(getattr(candidate_space, method_name)) def wrapped_method(*args, **kwargs): inner_object = candidate_space.inner_candidate_space return getattr(inner_object, method_name)(*args, **kwargs) return wrapped_method
[docs] class FamosCandidateSpace(CandidateSpace): """The FAMoS method class. This candidate space implements and extends the original FAMoS algorithm (doi: 10.1371/journal.pcbi.1007230). Attributes: critical_parameter_sets: A list of lists, where each inner list contains parameter IDs. All models must estimate at least 1 parameter from each critical parameter set. swap_parameter_sets: A list of lists, where each inner list contains parameter IDs. The lateral moves in FAMoS are constrained to be between parameters that exist in the same swap parameter set. method_scheme: A dictionary that specifies how to switch between methods when the current method doesn't produce a better model. Keys are `n`-tuples that described a pattern of length `n` methods. Values are methods. If the previous methods match the pattern in the key, then the method in the value will be used next. The order of the dictionary is important: only the first matched pattern will be used. Defaults to the method scheme described in the original FAMoS publication. n_reattempts: Integer. The total number of times that a jump-to-most-distance action will be performed, triggered whenever the model selection would normally terminate. Defaults to no reattempts (``0``). consecutive_laterals: Boolean. If ``True``, the method will continue performing lateral moves while they produce better models. Otherwise, the method scheme will be applied after one lateral move. """ default_method_scheme = { (Method.BACKWARD, Method.FORWARD): Method.LATERAL, (Method.FORWARD, Method.BACKWARD): Method.LATERAL, (Method.BACKWARD, Method.LATERAL): None, (Method.FORWARD, Method.LATERAL): None, (Method.FORWARD,): Method.BACKWARD, (Method.BACKWARD,): Method.FORWARD, (Method.LATERAL,): Method.FORWARD, (Method.MOST_DISTANT,): Method.FORWARD, None: Method.FORWARD, } forwarded_inner = [ "_consider_method", ] _consider_method = None forwarded_super_and_inner = [ "reset_accepted", "set_predecessor_model", "set_excluded_hashes", "set_limit", ]
[docs] def __init__( self, *args, predecessor_model: Model | None = None, critical_parameter_sets: list = [], swap_parameter_sets: list = [], method_scheme: dict[tuple, str] = None, n_reattempts: int = 0, consecutive_laterals: bool = False, **kwargs, ): for method_name in FamosCandidateSpace.forwarded_inner: setattr(self, method_name, forward_inner(self, method_name)) for method_name in FamosCandidateSpace.forwarded_super_and_inner: setattr( self, method_name, forward_super_and_inner(self, method_name) ) self.critical_parameter_sets = critical_parameter_sets self.swap_parameter_sets = swap_parameter_sets self.method_scheme = method_scheme if method_scheme is None: self.method_scheme = FamosCandidateSpace.default_method_scheme # FIXME remove and use `self.method` everywhere in the constructor # instead -- not required in other methods anymore self.initial_method = self.method_scheme[None] self.method = self.initial_method self.method_history = [self.initial_method] if predecessor_model is None: predecessor_model = VIRTUAL_INITIAL_MODEL if ( predecessor_model.hash == VIRTUAL_INITIAL_MODEL.hash and critical_parameter_sets ) or ( predecessor_model.hash != VIRTUAL_INITIAL_MODEL.hash and not self.check_critical(predecessor_model) ): raise ValueError( f"Provided predecessor model {predecessor_model.parameters} does not contain necessary critical parameters {self.critical_parameter_sets}. Provide a valid predecessor model." ) if ( predecessor_model.hash == VIRTUAL_INITIAL_MODEL.hash and self.initial_method not in VIRTUAL_INITIAL_MODEL_METHODS ): raise ValueError( f"The initial method {self.initial_method} does not support the `VIRTUAL_INITIAL_MODEL` as its predecessor_model." ) # FIXME remove `None` from the resulting `inner_methods` set? inner_methods = set.union( *[ { *( method_pattern if method_pattern is not None else (None,) ), next_method, } for method_pattern, next_method in self.method_scheme.items() ] ) if Method.LATERAL in inner_methods and not self.swap_parameter_sets: raise ValueError( "Use of the lateral method with FAMoS requires `swap_parameter_sets`." ) for method in inner_methods: if method is not None and method not in [ Method.FORWARD, Method.LATERAL, Method.BACKWARD, Method.MOST_DISTANT, ]: raise NotImplementedError( f"Methods FAMoS can swap to are `Method.FORWARD`, `Method.BACKWARD` and `Method.LATERAL`, not {method}. \ Check if the method_scheme scheme provided is correct." ) self.inner_candidate_spaces = { Method.FORWARD: ForwardCandidateSpace( *args, predecessor_model=predecessor_model, **kwargs, ), Method.BACKWARD: BackwardCandidateSpace( *args, predecessor_model=predecessor_model, **kwargs, ), Method.LATERAL: LateralCandidateSpace( *args, predecessor_model=predecessor_model, max_steps=1, **kwargs, ), } self.inner_candidate_space = self.inner_candidate_spaces[ self.initial_method ] super().__init__( *args, method=self.method, predecessor_model=predecessor_model, **kwargs, ) self.n_reattempts = n_reattempts self.consecutive_laterals = consecutive_laterals if ( not self.consecutive_laterals and (Method.LATERAL,) not in self.method_scheme ): raise ValueError( "Please provide a method to switch to after a lateral search, " "if not enabling the `consecutive_laterals` option." ) if self.n_reattempts: # TODO make so max_number can be specified? It cannot in original FAMoS. self.most_distant_max_number = 100 else: self.most_distant_max_number = 1 # TODO update to new `Models` type. Currently problematic because the # order of `best_models` matters. This would be fine for `Models` to # handle, except that `Models._update` will use a pre-existing index # if there is a model with the same hash. FIXME regenerate the expected # FAMoS models when `best_models` never contains duplicate models... # Also add a `sort` method to `Models` to sort by criterion. self.best_models = [] self.best_model_of_current_run = predecessor_model self.jumped_to_most_distant = False self.swap_done_successfully = False
[docs] @classmethod def read_arguments_from_yaml_dict(cls, yaml_dict) -> dict: kwargs = copy.deepcopy(yaml_dict) if (method_scheme_raw := kwargs.pop(METHOD_SCHEME, None)) is not None: method_scheme = { ( tuple( [ Method(method_str) for method_str in definition[PREVIOUS_METHODS] ] ) if definition[PREVIOUS_METHODS] is not None else None ): definition[NEXT_METHOD] for definition in method_scheme_raw } kwargs[METHOD_SCHEME] = method_scheme return super().read_arguments_from_yaml_dict(yaml_dict=kwargs)
[docs] def update_after_calibration( self, *args, iteration_calibrated_models: Models, **kwargs, ) -> None: """See `CandidateSpace.update_after_calibration`.""" super().update_after_calibration( *args, iteration_calibrated_models=iteration_calibrated_models, **kwargs, ) # In case we jumped to most distant in the last iteration, # there's no need for an update, so we reset the jumped variable # to False and continue to candidate generation if self.jumped_to_most_distant: self.jumped_to_most_distant = False jumped_to_model = one(iteration_calibrated_models) self.set_predecessor_model(jumped_to_model) self.previous_predecessor_model = jumped_to_model self.best_model_of_current_run = jumped_to_model return False if self.update_from_iteration_calibrated_models( iteration_calibrated_models=iteration_calibrated_models, ): logging.info("Switching method") self.switch_method() self.switch_inner_candidate_space( excluded_hashes=self.calibrated_models, ) logging.info( "Method switched to ", self.inner_candidate_space.method ) self.method_history.append(self.method)
[docs] def update_from_iteration_calibrated_models( self, iteration_calibrated_models: Models, ) -> bool: """Update ``self.best_models`` with the latest ``iteration_calibrated_models`` and determine if there was a new best model. If so, return ``False``. ``True`` otherwise. """ go_into_switch_method = True for model in iteration_calibrated_models: if ( self.best_model_of_current_run.hash == VIRTUAL_INITIAL_MODEL_HASH or default_compare( model0=self.best_model_of_current_run, model1=model, criterion=self.criterion, ) ): go_into_switch_method = False self.best_model_of_current_run = model if len( self.best_models ) < self.most_distant_max_number or default_compare( model0=self.best_models[self.most_distant_max_number - 1], model1=model, criterion=self.criterion, ): self.insert_model_into_best_models( model_to_insert=model, ) self.best_models = self.best_models[: self.most_distant_max_number] # When we switch to LATERAL method, we will do only one iteration with this # method. So if we do it successfully (i.e. that we found a new best model), we # want to switch method. This is why we put go_into_switch_method to True, so # we go into the method switching pipeline if self.method == Method.LATERAL and not self.consecutive_laterals: self.swap_done_successfully = True go_into_switch_method = True return go_into_switch_method
[docs] def insert_model_into_best_models(self, model_to_insert: Model) -> None: """Inserts a model into the list of best_models which are sorted w.r.t. the criterion specified. """ insert_index = bisect.bisect_left( [ model.get_criterion(self.criterion) for model in self.best_models ], model_to_insert.get_criterion(self.criterion), ) self.best_models.insert(insert_index, model_to_insert)
[docs] def consider(self, model: Model | None) -> bool: """Re-define ``consider`` of FAMoS to be the ``consider`` method of the ``inner_candidate_space``. Update all the attributes changed in the ``consider`` method. """ if self.limit.reached(): return False # Check if model contains necessary critical parameters and, if # the current method is swap, check the swap move is contained # in a swap set. if self.check_critical(model) and ( not self.method == Method.LATERAL or self.check_swap(model) ): return_value = self.inner_candidate_space.consider(model) # update the attributes self.models = self.inner_candidate_space.models self.distances = self.inner_candidate_space.distances self.set_excluded_hashes( self.inner_candidate_space.get_excluded_hashes() ) return return_value return True
[docs] def is_plausible(self, model: Model) -> bool: if self.check_critical(model): return self.inner_candidate_space.is_plausible(model) return False
[docs] def check_swap(self, model: Model) -> bool: """Check if parameters that are swapped are contained in the same swap parameter set. """ if self.method != Method.LATERAL: return True predecessor_estimated_parameters_ids = set( self.predecessor_model.get_estimated_parameter_ids() ) estimated_parameters_ids = set(model.get_estimated_parameter_ids()) swapped_parameters_ids = estimated_parameters_ids.symmetric_difference( predecessor_estimated_parameters_ids ) for swap_set in self.swap_parameter_sets: if swapped_parameters_ids.issubset(set(swap_set)): return True return False
[docs] def check_critical(self, model: Model) -> bool: """Check if the model contains all necessary critical parameters""" estimated_parameters_ids = set(model.get_estimated_parameter_ids()) for critical_set in self.critical_parameter_sets: if not estimated_parameters_ids.intersection(set(critical_set)): return False return True
[docs] def switch_method( self, ) -> None: """Switch to the next method with respect to the history of methods used and the switching scheme in ``self.method_scheme``. """ previous_method = self.method next_method = previous_method logging.info("SWITCHING", self.method_history) # If last method was LATERAL and we have made a succesfull swap # (a better model was found) then go back to FORWARD. Else do the # usual method swapping scheme. if self.swap_done_successfully: next_method = self.method_scheme[(Method.LATERAL,)] else: # iterate through the method_scheme dictionary to see which method to switch to for previous_methods in self.method_scheme: if previous_methods is not None and previous_methods == tuple( self.method_history[-len(previous_methods) :] ): next_method = self.method_scheme[previous_methods] # if found a switch just break (choosing first good switch) break # raise error if the method didn't change if next_method == previous_method: raise ValueError( "Method didn't switch when it had to. " "The `method_scheme` provided is not sufficient. " "Please provide a correct method_scheme scheme. " f"Method history: `{self.method_history}`. " ) # Terminate if next method is `None` if next_method is None: if self.n_reattempts: self.jump_to_most_distant() return raise StopIteration( f"The next method is {next_method}. The search is terminating." ) # If we try to switch to SWAP method but it's not available (no crit or swap parameter groups) if ( next_method == Method.LATERAL and not [ critical_set for critical_set in self.critical_parameter_sets if len(critical_set) > 1 ] and not self.swap_parameter_sets ): if self.n_reattempts: self.jump_to_most_distant() return raise ValueError( f"The next method is {next_method}, but there are no critical or swap parameters sets. Terminating." ) if previous_method == Method.LATERAL: self.swap_done_successfully = False self.update_method(method=next_method)
[docs] def update_method(self, method: Method): """Update ``self.method`` to ``method``.""" self.method = method
[docs] def switch_inner_candidate_space( self, excluded_hashes: list[ModelHash], ): """Switch the inner candidate space to match the current method. Args: excluded_hashes: Hashes of excluded models. """ # if self.method != Method.MOST_DISTANT: self.inner_candidate_space = self.inner_candidate_spaces[self.method] # reset the next inner candidate space with the current history of all # calibrated models self.inner_candidate_space.reset( predecessor_model=self.predecessor_model, excluded_hashes=excluded_hashes, )
[docs] def jump_to_most_distant( self, ): """Jump to most distant model with respect to the history of all calibrated models. """ predecessor_model = self.get_most_distant() logging.info("JUMPING: ", predecessor_model.parameters) # if model not appropriate make it so by adding the first # critical parameter from each critical parameter set if not self.check_critical(predecessor_model): for critical_set in self.critical_parameter_sets: # FIXME is this a good idea? probably better to request # the model from the model subspace, rather than editing # the parameters... predecessor_model.parameters[critical_set[0]] = ESTIMATE # self.update_method(self.initial_method) self.update_method(Method.MOST_DISTANT) self.n_reattempts -= 1 self.jumped_to_most_distant = True # FIXME rename here `predecessor_model` to `most_distant_model` # self.predecessor_model = None self.set_predecessor_model(None) self.best_model_of_current_run = None self.models = Models([predecessor_model]) self.write_summary_tsv("Jumped to the most distant model.") self.update_method(self.method_scheme[(Method.MOST_DISTANT,)])
# TODO Fix for non-famos model subspaces. FAMOS easy because of only 0;ESTIMATE
[docs] def get_most_distant( self, ) -> Model: """ Get most distant model to all the checked models. We take models from the sorted list of best models (``self.best_models``) and construct complements of these models. For all these complements we compute the distance in number of different estimated parameters to all models from history. For each complement we take the minimum of these distances as it's distance to history. Then we choose the complement model with the maximal distance to history. TODO: Next we check if this model is contained in any subspace. If so we choose it. If not we choose the model in a subspace that has least distance to this complement model. """ most_distance = 0 most_distant_indices = [] # FIXME for multiple PEtab problems? parameter_ids = list( get_petab_parameters( self.best_models[0]._model_subspace_petab_problem ) ) for model in self.best_models: model_estimated_parameters = np.array( [ p == ESTIMATE for p in model.get_parameter_values( parameter_ids=parameter_ids ) ] ).astype(int) complement_parameters = 1 - model_estimated_parameters # initialize the least distance to the maximal possible value of it complement_least_distance = len(complement_parameters) # get the complement least distance for calibrated_model in self.calibrated_models: calibrated_model_estimated_parameters = np.array( [ p == ESTIMATE for p in calibrated_model.get_parameter_values( parameter_ids=parameter_ids ) ] ).astype(int) difference = ( calibrated_model_estimated_parameters - complement_parameters ) l1_distance = np.abs(difference).sum() if l1_distance < complement_least_distance: complement_least_distance = l1_distance # if the complement is further away than the current best one # then save it as the new most distant away one if complement_least_distance > most_distance: most_distance = complement_least_distance most_distant_indices = complement_parameters if len(most_distant_indices) == 0: raise StopIteration("No most_distant model found. Terminating") most_distant_parameter_values = [ str(index).replace("1", ESTIMATE) for index in most_distant_indices ] most_distant_parameters = dict( zip(parameter_ids, most_distant_parameter_values, strict=True) ) most_distant_model = Model( model_subspace_petab_yaml=model.model_subspace_petab_yaml, model_subspace_id=model.model_subspace_id, model_subspace_indices=most_distant_indices, parameters=most_distant_parameters, ) return most_distant_model
[docs] def wrap_search_subspaces(self, search_subspaces): def wrapper(): search_subspaces(only_one_subspace=True) return wrapper
[docs] class LateralCandidateSpace(CandidateSpace): """Find models with the same number of estimated parameters."""
[docs] def __init__( self, *args, max_steps: int = None, **kwargs, ): """ Additional args: max_number_of_steps: Maximal allowed number of swap moves. If 0 then there is no maximum. """ super().__init__( *args, method=Method.LATERAL, **kwargs, ) self.max_steps = max_steps
[docs] def is_plausible(self, model: Model) -> bool: if self.predecessor_model is None: raise ValueError( "The predecessor_model is still None. Provide an appropriate predecessor_model" ) distances = self.distances_in_estimated_parameters(model) # If max_number_of_steps is non-zero and the number of steps made is # larger then move is not plausible. if self.max_steps is not None and distances["l1"] > 2 * self.max_steps: raise StopIteration( f"Maximal number of steps for method {self.method} exceeded. Stop sending candidate models." ) # A model is plausible if the number of estimated parameters remains # the same, but some estimated parameters have become fixed and vice # versa. if ( distances["size"] == 0 and # distances['size'] == 0 implies L1 % 2 == 0. # FIXME here and elsewhere, deal with models that are equal # except for the values of their fixed parameters. distances["l1"] > 0 ): return True return False
def _consider_method(self, model): return True
[docs] class BruteForceCandidateSpace(CandidateSpace): """The brute-force method class."""
[docs] def __init__(self, *args, **kwargs): # if args or kwargs: # # FIXME remove? # # FIXME at least support limit # warnings.warn( # 'Arguments were provided but will be ignored, because of the ' # 'brute force candidate space.' # ) super().__init__( *args, method=Method.BRUTE_FORCE, **kwargs, )
def _consider_method(self, model): return True
candidate_space_classes = { Method.FORWARD: ForwardCandidateSpace, Method.BACKWARD: BackwardCandidateSpace, Method.LATERAL: LateralCandidateSpace, Method.BRUTE_FORCE: BruteForceCandidateSpace, Method.FAMOS: FamosCandidateSpace, } def method_to_candidate_space_class(method: Method) -> type[CandidateSpace]: """Get a candidate space class, given its method name. Args: method: The name of the method corresponding to one of the implemented candidate spaces. Returns: The candidate space. """ candidate_space_class = candidate_space_classes.get(method, None) if candidate_space_class is None: raise NotImplementedError( f"The provided method `{method}` does not correspond to an " "implemented candidate space." ) return candidate_space_class ''' def distances_in_estimated_parameters( model: Model, predecessor_model: Model, #method: str = None, ) -> Dict[str, Union[float, int]]: """Distance between two models in model space, using different metrics. All metrics are in terms of estimated parameters. Metrics: l1: The L_1 distance between two models. size: The difference in the number of estimated parameters between two models. Args: model: The candidate model. predecessor_model: The initial model. method: The search method in use. Necessary if the predecessor model is the virtual initial model. Returns: The distances between the models, as a dictionary, where a key is the name of the metric, and the value is the corresponding distance. """ model0 = predecessor_model model1 = model if not model1.petab_yaml.samefile(model0.petab_yaml): raise NotImplementedError( 'Computation of distances between different PEtab problems is ' 'currently not supported. This error is also raised if the same ' 'PEtab problem is read from YAML files in different locations.' ) # All parameters from the PEtab problem are used in the computation. parameter_ids = list(model0.petab_parameters) parameters0 = np.array(model0.get_parameter_values(parameter_ids=parameter_ids)) parameters1 = np.array(model1.get_parameter_values(parameter_ids=parameter_ids)) estimated0 = (parameters0 == ESTIMATE).astype(int) estimated1 = (parameters1 == ESTIMATE).astype(int) difference = estimated1 - estimated0 # Changes to(/from) estimated from(/to) not estimated l1 = np.abs(difference).sum() # Change in the number of estimated parameters. size = np.sum(difference) # TODO constants? distances = { 'l1': l1, 'size': size, } return distances '''