Source code for petab_select.candidate_space

"""Classes and methods related to candidate spaces."""
import abc
import bisect
import copy
import csv
import logging
import warnings
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Type, Union

import numpy as np
from more_itertools import one

from .constants import (
    ESTIMATE,
    METHOD,
    METHOD_SCHEME,
    MODELS,
    NEXT_METHOD,
    PREDECESSOR_MODEL,
    PREVIOUS_METHODS,
    TYPE_PATH,
    VIRTUAL_INITIAL_MODEL,
    VIRTUAL_INITIAL_MODEL_METHODS,
    Criterion,
    Method,
)
from .handlers import TYPE_LIMIT, LimitHandler
from .model import Model, default_compare

__all__ = [
    'BackwardCandidateSpace',
    'BruteForceCandidateSpace',
    'CandidateSpace',
    'FamosCandidateSpace',
    'ForwardCandidateSpace',
    'LateralCandidateSpace',
]


[docs]class CandidateSpace(abc.ABC): """A base class for collecting candidate models. The intended use of subclasses is to identify suitable models in a model space, that will be provided to a model selection method for selection. Attributes: distances: The distances of all candidate models from the initial model. FIXME(dilpath) change list to int? Is storage of more than one value useful? predecessor_model: The model used for comparison, e.g. for stepwise methods. previous_predecessor_model: The previous predecessor model. models: The current set of candidate models. exclusions: A list of model hashes. Models that match a hash in ``exclusions`` will not be accepted into the candidate space. The hashes of models that are accepted are added to ``exclusions``. limit: A handler to limit the number of accepted models. method: The model selection method of the candidate space. governing_method: Used to store the search method that governs the choice of method during a search. In some cases, this is always the same as the method attribute. An example of a difference is in the bidirectional method, where ``governing_method`` stores the bidirectional method, whereas `method` may also store the forward or backward methods. summary_tsv: A string or :class:`pathlib.Path`. A summary of the model selection progress will be written to this file. FIXME(dilpath) #limited: # A descriptor that handles the limit on the number of accepted models. #limit: # Models will fail `self.consider` if `len(self.models) >= limit`. """
[docs] def __init__( self, method: Method, # TODO add MODEL_TYPE = Union[str, Model], str for VIRTUAL_INITIAL_MODEL predecessor_model: Optional[Model] = None, exclusions: Optional[List[Any]] = None, limit: TYPE_LIMIT = np.inf, summary_tsv: TYPE_PATH = None, previous_predecessor_model: Optional[Model] = None, ): self.method = method self.limit = LimitHandler( current=self.n_accepted, limit=limit, ) self.reset(predecessor_model=predecessor_model, exclusions=exclusions) self.summary_tsv = summary_tsv if self.summary_tsv is not None: self.summary_tsv = Path(self.summary_tsv) self._setup_summary_tsv() self.previous_predecessor_model = previous_predecessor_model if self.previous_predecessor_model is None: self.previous_predecessor_model = self.predecessor_model
def write_summary_tsv(self, row): if self.summary_tsv is None: return # Format single values to be valid rows if not isinstance(row, list): row = [ row, *([''] * 5), ] with open(self.summary_tsv, 'a', encoding="utf-8") as f: writer = csv.writer(f, delimiter='\t') writer.writerow(row) def _setup_summary_tsv(self): self.summary_tsv.resolve().parent.mkdir(parents=True, exist_ok=True) if not self.summary_tsv.exists(): self.write_summary_tsv( [ 'method', '# candidates', 'predecessor change', 'current model criterion', 'current model', 'candidate changes', ] ) @classmethod def read_arguments_from_yaml_dict(cls, yaml_dict): kwargs = copy.deepcopy(yaml_dict) predecessor_model = None if ( predecessor_model_yaml := kwargs.pop(PREDECESSOR_MODEL, None) ) is not None: predecessor_model = Model.from_yaml(predecessor_model_yaml) return { **kwargs, PREDECESSOR_MODEL: predecessor_model, }
[docs] def is_plausible(self, model: Model) -> bool: """Determine whether a candidate model is plausible. A plausible model is one that could possibly be chosen by model selection method, in the absence of information about other models in the model space. For example, given a forward selection method that starts with an initial model ``self.predecessor_model`` that has no estimated parameters, then only models with one or more estimated parameters are plausible. Args: model: The candidate model. Returns: ``True`` if ``model`` is plausible, else ``False``. """ return True
[docs] def distance(self, model: Model) -> Union[None, float, int]: """Compute the distance between two models that are neighbors. Args: model: The candidate model. predecessor_model: The initial model. Returns: The distance from ``predecessor_model`` to ``model``, or ``None`` if the distance should not be computed. """ return None
[docs] def accept( self, model: Model, distance: Union[None, float, int], # keep_others: bool = True, ) -> None: """Add a candidate model to the candidate space. Args: model: The model that will be added. distance: The distance of the model from the predecessor model. FIXME(dilpath) #keep_others: # Whether to keep other models that were previously added to the # candidate space. """ model.predecessor_model_hash = ( self.predecessor_model.get_hash() if isinstance(self.predecessor_model, Model) else self.predecessor_model ) self.models.append(model) self.distances.append(distance) self.exclude(model)
[docs] def n_accepted(self) -> TYPE_LIMIT: """Get the current number of accepted models.""" return len(self.models)
[docs] def exclude( self, model: Union[Model, List[Model]], ) -> None: """Exclude model(s) from future consideration. Args: model: The model(s) that will be excluded. """ if isinstance(model, list): for _model in model: self.exclusions.append(_model.get_hash()) else: self.exclusions.append(model.get_hash())
[docs] def exclude_hashes(self, hashes: Sequence[str]) -> None: """Exclude models from future consideration, by hash. Args: hashes: The model hashes that will be excluded """ self.exclusions.extend(hashes)
[docs] def excluded( self, model: Model, ) -> bool: """Whether a model is excluded.""" return model.get_hash() in self.exclusions
@abc.abstractmethod def _consider_method(self, model) -> bool: """Consider whether a model should be accepted, according to a method. Args: model: The candidate model. Returns: Whether a model should be accepted. """ return True
[docs] def consider(self, model: Union[Model, None]) -> bool: """Add a candidate model, if it should be added. Args: model: The candidate model. This value may be ``None`` if the :class:`ModelSubspace` decided to exclude the model that would have been sent. Returns: Whether it is OK to send additional models to the candidate space. For example, if the limit of the number of accepted models has been reached, then no further models should be sent. FIXME(dilpath) TODO change to return whether the model was accepted, and instead add `self.continue` to determine whether additional models should be sent. """ # Model was excluded by the `ModelSubspace` that called this method, so can be # skipped. if model is None: # TODO use a different code than `True`? return True if self.limit.reached(): return False if self.excluded(model): warnings.warn( f'Model has been previously excluded from the candidate space so is skipped here. Model subspace ID: {model.model_subspace_id}. Parameterization: {model.parameters}', RuntimeWarning, ) return True if not self.is_plausible(model): return True if not self._consider_method(model): return True self.accept(model, distance=self.distance(model)) return not self.limit.reached()
[docs] def reset_accepted(self) -> None: """Reset the accepted models.""" self.models = [] self.distances = []
def set_predecessor_model( self, predecessor_model: Union[Model, str, None] ): self.predecessor_model = predecessor_model if ( self.predecessor_model == VIRTUAL_INITIAL_MODEL and self.method not in VIRTUAL_INITIAL_MODEL_METHODS ): raise ValueError( f'A virtual initial model was requested for a method ({self.method}) that does not support them.' ) def get_predecessor_model(self): return self.predecessor_model def set_exclusions(self, exclusions: Union[List[str], None]): # TODO change to List[str] for hashes? self.exclusions = exclusions if self.exclusions is None: self.exclusions = [] def get_exclusions(self): return self.exclusions def set_limit(self, limit: TYPE_LIMIT = None): if limit is not None: self.limit.set_limit(limit) def get_limit(self): return self.limit.get_limit()
[docs] def wrap_search_subspaces(self, search_subspaces: Callable[[], None]): """Decorate the subspace searches of a model space. Used by candidate spaces to perform changes that alter the search. See :class:`BidirectionalCandidateSpace` for an example, where it's used to switch directions. Args: search_subspaces: The method that searches the subspaces of a model space. """ def wrapper(): search_subspaces() return wrapper
[docs] def reset( self, predecessor_model: Optional[Union[Model, str, None]] = None, # FIXME change `Any` to some `TYPE_MODEL_HASH` (e.g. union of str/int/float) exclusions: Optional[Union[List[str], None]] = None, limit: TYPE_LIMIT = None, ) -> None: """Reset the candidate models, optionally reinitialize with a model. Args: predecessor_model: The initial model. exclusions: Whether to reset model exclusions. limit: The new upper limit of the number of models in this candidate space. """ self.set_predecessor_model(predecessor_model) self.reset_accepted() self.set_exclusions(exclusions) self.set_limit(limit)
[docs] def distances_in_estimated_parameters( self, model: Model, predecessor_model: Optional[Model] = None, ) -> Dict[str, Union[float, int]]: """Distance between two models in model space, using different metrics. All metrics are in terms of estimated parameters. Metrics: l1: The L_1 distance between two models. size: The difference in the number of estimated parameters between two models. Args: model: The candidate model. Returns: The distances between the models, as a dictionary, where a key is the name of the metric, and the value is the corresponding distance. """ model0 = predecessor_model if model0 is None: model0 = self.predecessor_model model1 = model if model0 != VIRTUAL_INITIAL_MODEL and not model1.petab_yaml.samefile( model0.petab_yaml ): raise NotImplementedError( 'Computation of distances between different PEtab problems is ' 'currently not supported. This error is also raised if the same ' 'PEtab problem is read from YAML files in different locations.' ) # All parameters from the PEtab problem are used in the computation. if model0 == VIRTUAL_INITIAL_MODEL: parameter_ids = list(model1.petab_parameters) # FIXME need to take superset of all parameters amongst all PEtab problems # in all model subspaces to get an accurate comparable distance. Currently # only reasonable when working with a single PEtab problem for all models # in all subspaces. if self.method == Method.FORWARD: parameters0 = np.array([0 for _ in parameter_ids]) elif self.method == Method.BACKWARD: parameters0 = np.array([ESTIMATE for _ in parameter_ids]) else: raise NotImplementedError( 'Distances for the virtual initial model have not yet been ' f'implemented for the method "{self.method}". Please notify the' 'developers.' ) else: parameter_ids = list(model0.petab_parameters) parameters0 = np.array( model0.get_parameter_values(parameter_ids=parameter_ids) ) parameters1 = np.array( model1.get_parameter_values(parameter_ids=parameter_ids) ) # TODO change to some numpy elementwise operation estimated0 = np.array([p == ESTIMATE for p in parameters0]).astype(int) estimated1 = np.array([p == ESTIMATE for p in parameters1]).astype(int) difference = estimated1 - estimated0 # Changes to(/from) estimated from(/to) not estimated l1 = np.abs(difference).sum() # Change in the number of estimated parameters. size = np.sum(difference) # TODO constants? distances = { 'l1': l1, 'size': size, } return distances
[docs] def update_after_calibration( self, *args, **kwargs, ): """Do work in the candidate space after calibration. For example, this is used by the :class:`FamosCandidateSpace` to switch methods. Different candidate spaces require different arguments. All arguments are here, to ensure candidate spaces can be switched easily and still receive sufficient arguments. """ pass
[docs]class ForwardCandidateSpace(CandidateSpace): """The forward method class. Attributes: direction: ``1`` for the forward method, ``-1`` for the backward method. max_steps: Maximum number of steps forward in a single iteration of forward selection. Defaults to no maximum (``None``). """ direction = 1
[docs] def __init__( self, *args, predecessor_model: Optional[Union[Model, str]] = None, max_steps: int = None, **kwargs, ): # Although `VIRTUAL_INITIAL_MODEL` is `str` and can be used as a default # argument, `None` may be passed by other packages, so the default value # is handled here instead. self.max_steps = max_steps if predecessor_model is None: predecessor_model = VIRTUAL_INITIAL_MODEL super().__init__( method=Method.FORWARD if self.direction == 1 else Method.BACKWARD, *args, predecessor_model=predecessor_model, **kwargs, )
[docs] def is_plausible(self, model: Model) -> bool: distances = self.distances_in_estimated_parameters(model) n_steps = self.direction * distances['size'] if self.max_steps is not None and n_steps > self.max_steps: raise StopIteration( f"Maximal number of steps for method {self.method} exceeded. Stop sending candidate models." ) # A model is plausible if the number of estimated parameters strictly # increases (or decreases, if `self.direction == -1`), and no # previously estimated parameters become fixed. if self.predecessor_model == VIRTUAL_INITIAL_MODEL or ( n_steps > 0 and distances['l1'] == n_steps ): return True return False
[docs] def distance(self, model: Model) -> int: # TODO calculated here and `is_plausible`. Rewrite to only calculate # once? distances = self.distances_in_estimated_parameters(model) return distances['l1']
def _consider_method(self, model) -> bool: """See :meth:`CandidateSpace._consider_method`.""" distance = self.distance(model) # Get the distance of the current "best" plausible model(s) distance0 = np.inf if self.distances: distance0 = one(set(self.distances)) # TODO store each or just one? # distance0 = one(self.distances) # breakpoint() # Only keep the best model(s). if distance > distance0: return False if distance < distance0: self.reset_accepted() return True
[docs]class BackwardCandidateSpace(ForwardCandidateSpace): """The backward method class.""" direction = -1
[docs]class FamosCandidateSpace(CandidateSpace): """The FAMoS method class. This candidate space implements and extends the original FAMoS algorithm (doi: 10.1371/journal.pcbi.1007230). Attributes: critical_parameter_sets: A list of lists, where each inner list contains parameter IDs. All models must estimate at least 1 parameter from each critical parameter set. swap_parameter_sets: A list of lists, where each inner list contains parameter IDs. The lateral moves in FAMoS are constrained to be between parameters that exist in the same swap parameter set. method_scheme: A dictionary that specifies how to switch between methods when the current method doesn't produce a better model. Keys are `n`-tuples that described a pattern of length `n` methods. Values are methods. If the previous methods match the pattern in the key, then the method in the value will be used next. The order of the dictionary is important: only the first matched pattern will be used. Defaults to the method scheme described in the original FAMoS publication. n_reattempts: Integer. The total number of times that a jump-to-most-distance action will be performed, triggered whenever the model selection would normally terminate. Defaults to no reattempts (``0``). consecutive_laterals: Boolean. If ``True``, the method will continue performing lateral moves while they produce better models. Otherwise, the method scheme will be applied after one lateral move. """ default_method_scheme = { (Method.BACKWARD, Method.FORWARD): Method.LATERAL, (Method.FORWARD, Method.BACKWARD): Method.LATERAL, (Method.BACKWARD, Method.LATERAL): None, (Method.FORWARD, Method.LATERAL): None, (Method.FORWARD,): Method.BACKWARD, (Method.BACKWARD,): Method.FORWARD, (Method.LATERAL,): Method.FORWARD, (Method.MOST_DISTANT,): Method.FORWARD, None: Method.FORWARD, }
[docs] def __init__( self, *args, predecessor_model: Optional[Union[Model, str, None]] = None, critical_parameter_sets: List = [], swap_parameter_sets: List = [], method_scheme: Dict[tuple, str] = None, n_reattempts: int = 0, consecutive_laterals: bool = False, **kwargs, ): self.critical_parameter_sets = critical_parameter_sets self.swap_parameter_sets = swap_parameter_sets self.method_scheme = method_scheme if method_scheme is None: self.method_scheme = self.default_method_scheme # FIXME remove and use `self.method` everywhere in the constructor # instead -- not required in other methods anymore self.initial_method = self.method_scheme[None] self.method = self.initial_method self.method_history = [self.initial_method] if predecessor_model is None: predecessor_model = VIRTUAL_INITIAL_MODEL if ( predecessor_model == VIRTUAL_INITIAL_MODEL and critical_parameter_sets ) or ( predecessor_model != VIRTUAL_INITIAL_MODEL and not self.check_critical(predecessor_model) ): raise ValueError( f'Provided predecessor model {predecessor_model.parameters} does not contain necessary critical parameters {self.critical_parameter_sets}. Provide a valid predecessor model.' ) if ( predecessor_model == VIRTUAL_INITIAL_MODEL and self.initial_method not in VIRTUAL_INITIAL_MODEL_METHODS ): raise ValueError( f"The initial method {self.initial_method} does not support the `VIRTUAL_INITIAL_MODEL` as its predecessor_model." ) # FIXME remove `None` from the resulting `inner_methods` set? inner_methods = set.union( *[ set( [ *( method_pattern if method_pattern is not None else (None,) ), next_method, ] ) for method_pattern, next_method in self.method_scheme.items() ] ) if Method.LATERAL in inner_methods and not self.swap_parameter_sets: raise ValueError( f"Use of the lateral method with FAMoS requires `swap_parameter_sets`." ) for method in inner_methods: if method is not None and method not in [ Method.FORWARD, Method.LATERAL, Method.BACKWARD, Method.MOST_DISTANT, ]: raise NotImplementedError( f'Methods FAMoS can swap to are `Method.FORWARD`, `Method.BACKWARD` and `Method.LATERAL`, not {method}. \ Check if the method_scheme scheme provided is correct.' ) self.inner_candidate_spaces = { Method.FORWARD: ForwardCandidateSpace( *args, predecessor_model=predecessor_model, **kwargs, ), Method.BACKWARD: BackwardCandidateSpace( *args, predecessor_model=predecessor_model, **kwargs, ), Method.LATERAL: LateralCandidateSpace( *args, predecessor_model=( predecessor_model if predecessor_model != VIRTUAL_INITIAL_MODEL else None ), max_steps=1, **kwargs, ), } self.inner_candidate_space = self.inner_candidate_spaces[ self.initial_method ] super().__init__( method=self.method, *args, predecessor_model=predecessor_model, **kwargs, ) self.n_reattempts = n_reattempts self.consecutive_laterals = consecutive_laterals if ( not self.consecutive_laterals and (Method.LATERAL,) not in self.method_scheme ): raise ValueError( "Please provide a method to switch to after a lateral search, if not enabling the `consecutive_laterals` option." ) if self.n_reattempts: # TODO make so max_number can be specified? It cannot in original FAMoS. self.most_distant_max_number = 100 else: self.most_distant_max_number = 1 self.best_models = [] self.best_model_of_current_run = predecessor_model self.jumped_to_most_distant = False self.swap_done_successfully = False
@classmethod def read_arguments_from_yaml_dict(cls, yaml_dict) -> dict: kwargs = copy.deepcopy(yaml_dict) if (method_scheme_raw := kwargs.pop(METHOD_SCHEME, None)) is not None: method_scheme = { ( tuple( [ Method(method_str) for method_str in definition[PREVIOUS_METHODS] ] ) if definition[PREVIOUS_METHODS] is not None else None ): definition[NEXT_METHOD] for definition in method_scheme_raw } kwargs[METHOD_SCHEME] = method_scheme return super().read_arguments_from_yaml_dict(yaml_dict=kwargs)
[docs] def update_after_calibration( self, *args, calibrated_models: Dict[str, Model], newly_calibrated_models: Dict[str, Model], criterion: Criterion, **kwargs, ) -> None: """See `CandidateSpace.update_after_calibration`.""" # super().update_after_calibration(*args, **kwargs) # In case we jumped to most distant in the last iteration, # there's no need for an update, so we reset the jumped variable # to False and continue to candidate generation if self.jumped_to_most_distant: self.jumped_to_most_distant = False jumped_to_model = one(newly_calibrated_models.values()) self.set_predecessor_model(jumped_to_model) self.best_model_of_current_run = jumped_to_model return False if self.update_from_newly_calibrated_models( newly_calibrated_models=newly_calibrated_models, criterion=criterion, ): logging.info("Switching method") self.switch_method(calibrated_models=calibrated_models) self.switch_inner_candidate_space( exclusions=list(calibrated_models), ) logging.info( "Method switched to ", self.inner_candidate_space.method ) self.method_history.append(self.method)
[docs] def update_from_newly_calibrated_models( self, newly_calibrated_models: Dict[str, Model], criterion: Criterion, ) -> bool: """Update ``self.best_models`` with the latest ``newly_calibrated_models`` and determine if there was a new best model. If so, return ``False``. ``True`` otherwise.""" go_into_switch_method = True for newly_calibrated_model in newly_calibrated_models.values(): if ( self.best_model_of_current_run == VIRTUAL_INITIAL_MODEL or default_compare( model0=self.best_model_of_current_run, model1=newly_calibrated_model, criterion=criterion, ) ): go_into_switch_method = False self.best_model_of_current_run = newly_calibrated_model if len( self.best_models ) < self.most_distant_max_number or default_compare( model0=self.best_models[self.most_distant_max_number - 1], model1=newly_calibrated_model, criterion=criterion, ): self.insert_model_into_best_models( model_to_insert=newly_calibrated_model, criterion=criterion, ) self.best_models = self.best_models[: self.most_distant_max_number] # When we switch to LATERAL method, we will do only one iteration with this # method. So if we do it successfully (i.e. that we found a new best model), we # want to switch method. This is why we put go_into_switch_method to True, so # we go into the method switching pipeline if self.method == Method.LATERAL and not self.consecutive_laterals: self.swap_done_successfully = True go_into_switch_method = True return go_into_switch_method
[docs] def insert_model_into_best_models( self, model_to_insert: Model, criterion: Criterion ) -> None: """Inserts a model into the list of best_models which are sorted w.r.t. the criterion specified.""" insert_index = bisect.bisect_left( [model.get_criterion(criterion) for model in self.best_models], model_to_insert.get_criterion(criterion), ) self.best_models.insert(insert_index, model_to_insert)
[docs] def consider(self, model: Union[Model, None]) -> bool: """Re-define ``consider`` of FAMoS to be the ``consider`` method of the ``inner_candidate_space``. Update all the attributes changed in the ``consider`` method.""" if self.limit.reached(): return False # Check if model contains necessary critical parameters and, if # the current method is swap, check the swap move is contained # in a swap set. if self.check_critical(model) and ( not self.method == Method.LATERAL or self.check_swap(model) ): return_value = self.inner_candidate_space.consider(model) # update the attributes self.models = self.inner_candidate_space.models self.distances = self.inner_candidate_space.distances self.exclusions = self.inner_candidate_space.exclusions return return_value return True
def _consider_method(self, model) -> bool: """See :meth:`CandidateSpace._consider_method`.""" return self.inner_candidate_space._consider_method(model)
[docs] def reset_accepted(self) -> None: """Changing the reset_accepted to reset the ``inner_candidate_space`` as well.""" super().reset_accepted() self.inner_candidate_space.reset_accepted()
[docs] def set_predecessor_model( self, predecessor_model: Union[Model, str, None] ): """Setting the predecessor model for the ``inner_candidate_space`` as well.""" super().set_predecessor_model(predecessor_model=predecessor_model) self.inner_candidate_space.set_predecessor_model( predecessor_model=predecessor_model )
[docs] def set_exclusions(self, exclusions: Union[List[str], None]): """Setting the exclusions for the ``inner_candidate_space`` as well.""" self.exclusions = exclusions self.inner_candidate_space.exclusions = exclusions if self.exclusions is None: self.exclusions = [] self.inner_candidate_space.exclusions = []
[docs] def set_limit(self, limit: TYPE_LIMIT = None): """Setting the limit for the ``inner_candidate_space`` as well.""" if limit is not None: self.limit.set_limit(limit) self.inner_candidate_space.limit.set_limit(limit)
[docs] def is_plausible(self, model: Model) -> bool: if self.check_critical(model): return self.inner_candidate_space.is_plausible(model) return False
[docs] def check_swap(self, model: Model) -> bool: """Check if parameters that are swapped are contained in the same swap parameter set.""" if self.method != Method.LATERAL: return True predecessor_estimated_parameters_ids = set( self.predecessor_model.get_estimated_parameter_ids_all() ) estimated_parameters_ids = set(model.get_estimated_parameter_ids_all()) swapped_parameters_ids = estimated_parameters_ids.symmetric_difference( predecessor_estimated_parameters_ids ) for swap_set in self.swap_parameter_sets: if swapped_parameters_ids.issubset(set(swap_set)): return True return False
[docs] def check_critical(self, model: Model) -> bool: """Check if the model contains all necessary critical parameters""" estimated_parameters_ids = set(model.get_estimated_parameter_ids_all()) for critical_set in self.critical_parameter_sets: if not estimated_parameters_ids.intersection(set(critical_set)): return False return True
[docs] def switch_method( self, calibrated_models: Dict[str, Model], ) -> None: """Switch to the next method with respect to the history of methods used and the switching scheme in ``self.method_scheme``.""" previous_method = self.method next_method = previous_method logging.info("SWITCHING", self.method_history) # If last method was LATERAL and we have made a succesfull swap # (a better model was found) then go back to FORWARD. Else do the # usual method swapping scheme. if self.swap_done_successfully: next_method = self.method_scheme[(Method.LATERAL,)] else: # iterate through the method_scheme dictionary to see which method to switch to for previous_methods in self.method_scheme: if previous_methods is not None and previous_methods == tuple( self.method_history[-len(previous_methods) :] ): next_method = self.method_scheme[previous_methods] # if found a switch just break (choosing first good switch) break # raise error if the method didn't change if next_method == previous_method: raise ValueError( "Method didn't switch when it had to. " "The `method_scheme` provided is not sufficient. " "Please provide a correct method_scheme scheme. " f"Method history: `{self.method_history}`. " ) # Terminate if next method is `None` if next_method is None: if self.n_reattempts: self.jump_to_most_distant(calibrated_models=calibrated_models) return raise StopIteration( f"The next method is {next_method}. The search is terminating." ) # If we try to switch to SWAP method but it's not available (no crit or swap parameter groups) if ( next_method == Method.LATERAL and not [ critical_set for critical_set in self.critical_parameter_sets if len(critical_set) > 1 ] and not self.swap_parameter_sets ): if self.n_reattempts: self.jump_to_most_distant(calibrated_models=calibrated_models) return raise ValueError( f"The next method is {next_method}, but there are no critical or swap parameters sets. Terminating." ) if previous_method == Method.LATERAL: self.swap_done_successfully = False self.update_method(method=next_method)
[docs] def update_method(self, method: Method): """Update ``self.method`` to ``method``.""" self.method = method
[docs] def switch_inner_candidate_space( self, exclusions: List[str], ): """Switch the inner candidate space to match the current method. Args: exclusions: Hashes of excluded models. """ # if self.method != Method.MOST_DISTANT: self.inner_candidate_space = self.inner_candidate_spaces[self.method] # reset the next inner candidate space with the current history of all # calibrated models self.inner_candidate_space.reset( predecessor_model=self.predecessor_model, exclusions=exclusions, )
[docs] def jump_to_most_distant( self, calibrated_models: Dict[str, Model], ): """Jump to most distant model with respect to the history of all calibrated models.""" predecessor_model = self.get_most_distant( calibrated_models=calibrated_models, ) logging.info("JUMPING: ", predecessor_model.parameters) # if model not appropriate make it so by adding the first # critical parameter from each critical parameter set if not self.check_critical(predecessor_model): for critical_set in self.critical_parameter_sets: predecessor_model.parameters[critical_set[0]] = ESTIMATE # self.update_method(self.initial_method) self.update_method(Method.MOST_DISTANT) self.n_reattempts -= 1 self.jumped_to_most_distant = True # FIXME rename here `predecessor_model` to `most_distant_model` # self.predecessor_model = None self.set_predecessor_model(None) self.best_model_of_current_run = None self.models = [predecessor_model] self.write_summary_tsv("Jumped to the most distant model.") self.update_method(self.method_scheme[(Method.MOST_DISTANT,)])
# TODO Fix for non-famos model subspaces. FAMOS easy because of only 0;ESTIMATE
[docs] def get_most_distant( self, calibrated_models: Dict[str, Model], ) -> Model: """ Get most distant model to all the checked models. We take models from the sorted list of best models (``self.best_models``) and construct complements of these models. For all these complements we compute the distance in number of different estimated parameters to all models from history. For each complement we take the minimum of these distances as it's distance to history. Then we choose the complement model with the maximal distance to history. TODO: Next we check if this model is contained in any subspace. If so we choose it. If not we choose the model in a subspace that has least distance to this complement model. """ most_distance = 0 most_distant_indices = [] parameter_ids = self.best_models[0].petab_parameters for model in self.best_models: model_estimated_parameters = np.array( [ p == ESTIMATE for p in model.get_parameter_values( parameter_ids=parameter_ids ) ] ).astype(int) complement_parameters = 1 - model_estimated_parameters # initialize the least distance to the maximal possible value of it complement_least_distance = len(complement_parameters) # get the complement least distance for calibrated_model in calibrated_models.values(): calibrated_model_estimated_parameters = np.array( [ p == ESTIMATE for p in calibrated_model.get_parameter_values( parameter_ids=parameter_ids ) ] ).astype(int) difference = ( calibrated_model_estimated_parameters - complement_parameters ) l1_distance = np.abs(difference).sum() if l1_distance < complement_least_distance: complement_least_distance = l1_distance # if the complement is further away than the current best one # then save it as the new most distant away one if complement_least_distance > most_distance: most_distance = complement_least_distance most_distant_indices = complement_parameters if len(most_distant_indices) == 0: raise StopIteration("No most_distant model found. Terminating") most_distant_parameter_values = [ str(index).replace('1', ESTIMATE) for index in most_distant_indices ] most_distant_parameters = { parameter_id: index for parameter_id, index in zip( parameter_ids, most_distant_parameter_values ) } most_distant_model = Model( petab_yaml=model.petab_yaml, model_subspace_id=model.model_subspace_id, model_subspace_indices=most_distant_indices, parameters=most_distant_parameters, ) return most_distant_model
[docs] def wrap_search_subspaces(self, search_subspaces): def wrapper(): search_subspaces(only_one_subspace=True) return wrapper
[docs]class LateralCandidateSpace(CandidateSpace): """Find models with the same number of estimated parameters."""
[docs] def __init__( self, *args, predecessor_model: Union[Model, None], max_steps: int = None, **kwargs, ): """ Additional args: max_number_of_steps: Maximal allowed number of swap moves. If 0 then there is no maximum. """ super().__init__( method=Method.LATERAL, *args, predecessor_model=predecessor_model, **kwargs, ) self.max_steps = max_steps
[docs] def is_plausible(self, model: Model) -> bool: if self.predecessor_model is None: raise ValueError( f"The predecessor_model is still None. Provide an appropriate predecessor_model" ) distances = self.distances_in_estimated_parameters(model) # If max_number_of_steps is non-zero and the number of steps made is # larger then move is not plausible. if self.max_steps is not None and distances['l1'] > 2 * self.max_steps: raise StopIteration( f"Maximal number of steps for method {self.method} exceeded. Stop sending candidate models." ) # A model is plausible if the number of estimated parameters remains # the same, but some estimated parameters have become fixed and vice # versa. if ( distances['size'] == 0 and # distances['size'] == 0 implies L1 % 2 == 0. # FIXME here and elsewhere, deal with models that are equal # except for the values of their fixed parameters. distances['l1'] > 0 ): return True return False
def _consider_method(self, model): return True
[docs]class BruteForceCandidateSpace(CandidateSpace): """The brute-force method class."""
[docs] def __init__(self, *args, **kwargs): # if args or kwargs: # # FIXME remove? # # FIXME at least support limit # warnings.warn( # 'Arguments were provided but will be ignored, because of the ' # 'brute force candidate space.' # ) super().__init__( method=Method.BRUTE_FORCE, *args, **kwargs, )
def _consider_method(self, model): return True
candidate_space_classes = { Method.FORWARD: ForwardCandidateSpace, Method.BACKWARD: BackwardCandidateSpace, Method.LATERAL: LateralCandidateSpace, Method.BRUTE_FORCE: BruteForceCandidateSpace, Method.FAMOS: FamosCandidateSpace, } def method_to_candidate_space_class(method: Method) -> Type[CandidateSpace]: """Get a candidate space class, given its method name. Args: method: The name of the method corresponding to one of the implemented candidate spaces. Returns: The candidate space. """ candidate_space_class = candidate_space_classes.get(method, None) if candidate_space_class is None: raise NotImplementedError( f'The provided method `{method}` does not correspond to an ' 'implemented candidate space.' ) return candidate_space_class ''' def distances_in_estimated_parameters( model: Model, predecessor_model: Model, #method: str = None, ) -> Dict[str, Union[float, int]]: """Distance between two models in model space, using different metrics. All metrics are in terms of estimated parameters. Metrics: l1: The L_1 distance between two models. size: The difference in the number of estimated parameters between two models. Args: model: The candidate model. predecessor_model: The initial model. method: The search method in use. Necessary if the predecessor model is the virtual initial model. Returns: The distances between the models, as a dictionary, where a key is the name of the metric, and the value is the corresponding distance. """ model0 = predecessor_model model1 = model if not model1.petab_yaml.samefile(model0.petab_yaml): raise NotImplementedError( 'Computation of distances between different PEtab problems is ' 'currently not supported. This error is also raised if the same ' 'PEtab problem is read from YAML files in different locations.' ) # All parameters from the PEtab problem are used in the computation. parameter_ids = list(model0.petab_parameters) parameters0 = np.array(model0.get_parameter_values(parameter_ids=parameter_ids)) parameters1 = np.array(model1.get_parameter_values(parameter_ids=parameter_ids)) estimated0 = (parameters0 == ESTIMATE).astype(int) estimated1 = (parameters1 == ESTIMATE).astype(int) difference = estimated1 - estimated0 # Changes to(/from) estimated from(/to) not estimated l1 = np.abs(difference).sum() # Change in the number of estimated parameters. size = np.sum(difference) # TODO constants? distances = { 'l1': l1, 'size': size, } return distances '''