Source code for chi._problems

#
# This file is part of the chi repository
# (https://github.com/DavAug/chi/) which is released under the
# BSD 3-clause license. See accompanying LICENSE.md for copyright notice and
# full license details.
#
# The InverseProblem class is based on the SingleOutputProblem and
# MultiOutputProblem classes of PINTS (https://github.com/pints-team/pints/),
# which is distributed under the BSD 3-clause license.
#

import copy
from warnings import warn

import myokit
import numpy as np
import pandas as pd
import pints

import chi


[docs] class ProblemModellingController(object): """ A problem modelling controller which simplifies the model building process of a pharmacokinetic and pharmacodynamic problem. The class is instantiated with an instance of a :class:`MechanisticModel` and one instance of an :class:`ErrorModel` for each mechanistic model output. :param mechanistic_model: A mechanistic model for the problem. :type mechanistic_model: MechanisticModel :param error_models: A list of error models. One error model has to be provided for each mechanistic model output. :type error_models: list[ErrorModel] :param outputs: A list of mechanistic model output names, which can be used to map the error models to mechanistic model outputs. If ``None``, the error models are assumed to be ordered in the same order as :meth:`MechanisticModel.outputs`. :type outputs: list[str], optional """ def __init__(self, mechanistic_model, error_models, outputs=None): super(ProblemModellingController, self).__init__() # Check inputs if not isinstance(mechanistic_model, chi.MechanisticModel): raise TypeError( 'The mechanistic model has to be an instance of a ' 'chi.MechanisticModel.') if not isinstance(error_models, list): error_models = [error_models] for error_model in error_models: if not isinstance(error_model, chi.ErrorModel): raise TypeError( 'Error models have to be instances of a ' 'chi.ErrorModel.') # Copy mechanistic model mechanistic_model = mechanistic_model.copy() # Set outputs if outputs is not None: mechanistic_model.set_outputs(outputs) # Get number of outputs n_outputs = mechanistic_model.n_outputs() if len(error_models) != n_outputs: raise ValueError( 'Wrong number of error models. One error model has to be ' 'provided for each mechanistic error model.') # Copy error models error_models = [copy.copy(error_model) for error_model in error_models] # Remember models self._mechanistic_model = mechanistic_model self._error_models = error_models # Set defaults self._population_model = None self._log_prior = None self._data = None self._covariates = None self._dosing_regimens = None self._set_error_model_parameter_names() def _check_covariate_dict( self, covariate_dict, covariate_names, observables): """ Makes sure that for each covariate name (from model) an observable (from dataframe) exists. """ # Check that model needs covariates if len(covariate_names) == 0: return None # If no mapping is provided, construct default mapping if covariate_dict is None: # Assume trivial map covariate_dict = {cov: cov for cov in covariate_names} # Check that covariate name map is valid for cov in covariate_names: if cov not in list(covariate_dict.keys()): raise ValueError( 'The covariate <' + str(cov) + '> could not be identified ' 'in the covariate name map.') mapped_name = covariate_dict[cov] if mapped_name not in observables: raise ValueError( 'The covariate <' + str(mapped_name) + '> could not be ' 'identified in the dataframe.') return covariate_dict def _check_covariate_values(self, covariate_names): """ Makes sure that covariates can be reshaped in to an array of shape (n, c). In other words, checks whether for each covariate_name there exists exactly one non-NaN value for each ID. """ # Check that model needs covariates if len(covariate_names) == 0: return None for name in covariate_names: # Mask covariate values mask = self._data[self._obs_key] == self._covariate_dict[name] temp = self._data[mask] for _id in self._ids: # Mask values for individual mask = temp[self._id_key] == _id temp2 = temp.loc[mask, self._value_key].dropna() if len(temp2) != 1: covariate = self._covariate_dict[name] raise ValueError( 'There are either 0 or more than 1 value of the ' 'covariate %s for ID %s. ' 'Exactly one covariate value ' 'has to be provided for each ID.' % (covariate, _id) ) def _check_output_observable_dict( self, output_observable_dict, outputs, observables): """ Makes sure that the mechanistic model outputs are correctly mapped to the observables in the dataframe. """ if output_observable_dict is None: if (len(outputs) == 1) and (len(observables) == 1): # Create map of single output to single observable output_observable_dict = {outputs[0]: observables[0]} else: # Assume trivial map output_observable_dict = {output: output for output in outputs} # Check that output-observable map is valid for output in outputs: if output not in list(output_observable_dict.keys()): raise ValueError( 'The output <' + str(output) + '> could not be identified ' 'in the output-observable map.') observable = output_observable_dict[output] if observable not in observables: raise ValueError( 'The observable <' + str(observable) + '> could not be ' 'identified in the dataframe.') return output_observable_dict def _clean_data(self, dose_key, dose_duration_key): """ Makes sure that the data is formated properly. 1. ids are strings 2. time are numerics or NaN 3. observables are strings 4. values are numerics or NaN 5. observable types are 'Modelled' or 'Covariate' 6. dose are numerics or NaN 7. duration are numerics or NaN """ # Create container for data columns = [ self._id_key, self._time_key, self._obs_key, self._value_key] if dose_key is not None: columns += [dose_key] if dose_duration_key is not None: columns += [dose_duration_key] data = pd.DataFrame(columns=columns) # Convert IDs to strings data[self._id_key] = self._data[self._id_key].astype( "string") # Convert times to numerics data[self._time_key] = pd.to_numeric(self._data[self._time_key]) # Convert observables to strings data[self._obs_key] = self._data[self._obs_key].astype( "string") # Convert values to numerics data[self._value_key] = pd.to_numeric(self._data[self._value_key]) # Convert dose to numerics if dose_key is not None: data[dose_key] = pd.to_numeric( self._data[dose_key]) # Convert duration to numerics if dose_duration_key is not None: data[dose_duration_key] = pd.to_numeric( self._data[dose_duration_key]) self._data = data def _create_hierarchical_log_likelihood(self, log_likelihoods): """ Returns an instance of a chi.HierarchicalLoglikelihood based on the provided list of log-likelihoods and the population models. """ # Get covariates from the dataset if any are needed covariate_names = self.get_covariate_names() covariates = None if len(covariate_names) > 0: covariates = self._extract_covariates(covariate_names) log_likelihood = chi.HierarchicalLogLikelihood( log_likelihoods, self._population_model, covariates) return log_likelihood def _create_log_likelihoods(self, ids): """ Returns a list of log-likelihoods, one for each individual in the dataset. If individual is not None, only the likelihood of this individual is returned. """ # Create a likelihood for each individual log_likelihoods = [] for individual in ids: # Set dosing regimen if self._dosing_regimens: self._mechanistic_model.set_dosing_regimen( self._dosing_regimens[individual]) log_likelihood = self._create_log_likelihood(individual) if log_likelihood is not None: # If data exists for this individual, append to log-likelihoods log_likelihoods.append(log_likelihood) if len(ids) == 1: return log_likelihoods[0] return log_likelihoods def _create_log_likelihood(self, individual): """ Gets the relevant data for the individual and returns the resulting chi.LogLikelihood. """ # Get individuals data times = [] observations = [] mask = self._data[self._id_key] == individual data = self._data[mask][ [self._time_key, self._obs_key, self._value_key]] for output in self._mechanistic_model.outputs(): # Mask data for observable observable = self._output_observable_dict[output] mask = data[self._obs_key] == observable temp_df = data[mask] # Filter times and observations for non-NaN entries mask = temp_df[self._value_key].notnull() temp_df = temp_df[[self._time_key, self._value_key]][mask] mask = temp_df[self._time_key].notnull() temp_df = temp_df[mask] # Collect data for output times.append(temp_df[self._time_key].to_numpy()) observations.append(temp_df[self._value_key].to_numpy()) # # Count outputs that were measured # # TODO: copy mechanistic model and update model outputs. # # (Useful for e.g. control group and dose group training) # n_measured_outputs = 0 # for output_measurements in observations: # if len(output_measurements) > 0: # n_measured_outputs += 1 # Create log-likelihood and set ID to individual log_likelihood = chi.LogLikelihood( self._mechanistic_model, self._error_models, observations, times) log_likelihood.set_id(individual) return log_likelihood def _extract_covariates(self, covariate_names): """ Extracts covariates from the pandas.DataFrame and formats them as a np.ndarray of shape (n, c). """ # Format covariates to array of shape (n, c) c = len(covariate_names) n = len(self._ids) covariates = np.empty(shape=(n, c)) for idc, name in enumerate(covariate_names): mask = self._data[self._obs_key] == self._covariate_dict[name] temp = self._data[mask] for idn, _id in enumerate(self._ids): mask = temp[self._id_key] == _id covariates[idn, idc] = \ temp.loc[mask, self._value_key].dropna().values.squeeze() return covariates def _extract_dosing_regimens(self, dose_key, duration_key): """ Converts the dosing regimens defined by the pandas.DataFrame into myokit.Protocols, and returns them as a dictionary with individual IDs as keys, and regimens as values. For each dose entry in the dataframe a dose event is added to the myokit.Protocol. If the duration of the dose is not provided a bolus dose of duration 0.01 time units is assumed. """ # Create duration column if it doesn't exist and set it to default # bolus duration of 0.01 if duration_key is None: duration_key = 'Duration in base time unit' self._data[duration_key] = 0.01 # Extract regimen from dataset regimens = dict() for label in self._ids: # Filter times and dose events for non-NaN entries mask = self._data[self._id_key] == label data = self._data[ [self._time_key, dose_key, duration_key]][mask] mask = data[dose_key].notnull() data = data[mask] mask = data[self._time_key].notnull() data = data[mask] # Add dose events to dosing regimen regimen = myokit.Protocol() for _, row in data.iterrows(): # Set duration duration = row[duration_key] if np.isnan(duration): # If duration is not provided, we assume a bolus dose # which we approximate by 0.01 time_units. duration = 0.01 # Compute dose rate and set regimen dose_rate = row[dose_key] / duration time = row[self._time_key] regimen.add(myokit.ProtocolEvent(dose_rate, time, duration)) regimens[label] = regimen return regimens def _set_error_model_parameter_names(self): """ Resets the error model parameter names and prepends the output name if more than one output exists. """ # Reset error model parameter names to defaults for error_model in self._error_models: error_model.set_parameter_names(None) # Rename error model parameters, if more than one output n_outputs = self._mechanistic_model.n_outputs() if n_outputs > 1: # Get output names outputs = self._mechanistic_model.outputs() for output_id, error_model in enumerate(self._error_models): # Get original parameter names names = error_model.get_parameter_names() # Prepend output name output = outputs[output_id] names = [output + ' ' + name for name in names] # Set new parameter names error_model.set_parameter_names(names)
[docs] def fix_parameters(self, name_value_dict): """ Fixes the value of model parameters, and effectively removes them as a parameter from the model. Fixing the value of a parameter to ``None``, sets the parameter free again. .. note:: 1. Fixing model parameters resets the log-prior to ``None``. 2. Once a population model is set, only population model parameters can be fixed. 3. Setting data resets all population parameters as the number of parameters may change with the number of modelled individuals. :param name_value_dict: A dictionary with model parameters as keys, and the value to be fixed at as values. :type name_value_dict: dict """ # Check type of dictionanry try: name_value_dict = dict(name_value_dict) except (TypeError, ValueError): raise ValueError( 'The name-value dictionary has to be convertable to a python ' 'dictionary.') # Convert models to reduced models models = [] if self._population_model is not None: # If a population model is set, fix only population parameters population_model = self._population_model if not isinstance(population_model, chi.ReducedPopulationModel): population_model = chi.ReducedPopulationModel( population_model) models.append(population_model) else: mechanistic_model = self._mechanistic_model error_models = self._error_models if not isinstance(mechanistic_model, chi.ReducedMechanisticModel): mechanistic_model = chi.ReducedMechanisticModel( mechanistic_model) for idm, error_model in enumerate(error_models): if not isinstance(error_model, chi.ReducedErrorModel): error_model = chi.ReducedErrorModel(error_model) error_models[idm] = error_model models += [mechanistic_model] + error_models # Fix parameters for model in models: model.fix_parameters(name_value_dict) # Safe models # (if no parameters are fixed, convert back to non-reduced models) if self._population_model is not None: population_model = models[0] if population_model.n_fixed_parameters() == 0: population_model = population_model.get_population_model() self._population_model = population_model else: mechanistic_model = models[0] error_models = models[1:] if mechanistic_model.n_fixed_parameters() == 0: mechanistic_model = mechanistic_model.mechanistic_model() for idm, error_model in enumerate(error_models): if error_model.n_fixed_parameters() == 0: error_models[idm] = error_model.get_error_model() self._mechanistic_model = mechanistic_model self._error_models = error_models # Reset priors self._log_prior = None
[docs] def get_dosing_regimens(self): """ Returns a dictionary of dosing regimens in form of :class:`myokit.Protocol` instances. The dosing regimens are extracted from the dataset if a dose key is provided. If no dose key is provided ``None`` is returned. """ return self._dosing_regimens
[docs] def get_log_prior(self): """ Returns the :class:`LogPrior` for the model parameters. If no log-prior is set, ``None`` is returned. """ return self._log_prior
[docs] def get_log_posterior(self, individual=None): r""" Returns the :class:`LogPosterior` defined by the measurements of the modelled observables, the administered dosing regimen, the mechanistic model, the error model, the log-prior, and optionally the population model, covariates and the fixed model parameters. If measurements of multiple individuals exist in the dataset, the indiviudals ID can be passed to return the log-posterior associated to that individual. If no ID is selected and no population model has been set, a list of log-posteriors is returned corresponding to each of the individuals. This method raises an error if the data or the log-prior have not been set. See :meth:`set_data` and :meth:`set_log_prior`. :param individual: The ID of an individual. If ``None`` the log-posteriors for all individuals is returned. Input is ignored if a population model is set. :type individual: str, optional """ # Check prerequesites if self._data is None: raise ValueError( 'The data has not been set.') if self._log_prior is None: raise ValueError( 'The log-prior has not been set.') # Check that individual is in ids _id = individual if self._population_model is None else None if (_id is not None) and (_id not in self._ids): raise ValueError( 'The individual cannot be found in the ID column of the ' 'dataset.') # Set ID to all IDs if population model is set, and to first ID # if not set but None if self._population_model is not None: ids = self._ids elif _id is None: ids = [self._ids[0]] else: ids = [_id] # Create log-likelihoods log_likelihood = self._create_log_likelihoods(ids) if self._population_model is not None: # Compose HierarchicalLogLikelihoods log_likelihood = self._create_hierarchical_log_likelihood( log_likelihood) # Compose the log-posterior if isinstance(log_likelihood, chi.LogLikelihood): log_posterior = chi.LogPosterior( log_likelihood, self._log_prior) else: log_posterior = chi.HierarchicalLogPosterior( log_likelihood, self._log_prior) return log_posterior
[docs] def get_n_parameters(self, exclude_pop_model=False): """ Returns the number of the model parameters, i.e. the combined number of parameters from the mechanistic model and the error model when no population model has been set, or the number of population model parameters when a population model has been set. Any parameters that have been fixed to a constant value will not be included in the number of model parameters. :param exclude_pop_model: A boolean flag to indicate whether the population model should be ignored (if set). :type exclude_pop_model: bool, optional """ if (self._population_model is None) or exclude_pop_model: n_parameters = self._mechanistic_model.n_parameters() for error_model in self._error_models: n_parameters += error_model.n_parameters() return n_parameters return self._population_model.n_parameters()
[docs] def get_covariate_names(self): """ Returns the names of the covariates. """ if self._population_model is None: return [] return self._population_model.get_covariate_names()
[docs] def get_parameter_names(self, exclude_pop_model=False): """ Returns the names of the model parameters, i.e. the combined names of the mechanistic model parameters and the error model parameters when no population model has been set, or the names of population model parameters when a population model has been set. Any parameters that have been fixed to a constant value will not be included. :param exclude_pop_model: A boolean flag to indicate whether the population model should be ignored (if set). :type exclude_pop_model: bool, optional """ if (self._population_model is None) or exclude_pop_model: names = self._mechanistic_model.parameters() for error_model in self._error_models: names += error_model.get_parameter_names() return names return self._population_model.get_parameter_names()
[docs] def get_predictive_model(self): """ Returns a :class:`PredictiveModel` defined by the mechanistic model, the error model, and optionally the population model and the fixed model parameters. """ # Create predictive model predictive_model = chi.PredictiveModel( self._mechanistic_model, self._error_models) if (self._population_model is not None): predictive_model = chi.PopulationPredictiveModel( predictive_model, self._population_model) return predictive_model
[docs] def set_data( self, data, output_observable_dict=None, covariate_dict=None, id_key='ID', time_key='Time', obs_key='Observable', value_key='Value', dose_key='Dose', dose_duration_key='Duration'): """ Sets the data of the modelling problem. The data contains information about the measurement time points, the measured values of the observables, the observable name, IDs to identify the corresponding individuals, and optionally information on the administered dose amount and duration. The data is expected to be in form of a :class:`pandas.DataFrame` with the columns ID | Time | Observable | Value | Dose | Duration. If no information exists, the corresponding column keys can be set to ``None``. .. note:: Time-dependent covariates are currently not supported. Thus, the Time column of observables that are used as covariates is ignored. :param data: A dataframe with an ID, time, observable, value and optionally a dose and duration column. :type data: pandas.DataFrame :param output_observable_dict: A dictionary with mechanistic model output names as keys and dataframe observable names as values. If ``None`` the model outputs and observables are assumed to have the same names. :type output_observable_dict: dict, optional :param covariate_dict: A dictionary with population model covariate names as keys and dataframe observables as values. If ``None`` the model covariates and observables are assumed to have the same names. :type covariate_dict: dict, optional :param id_key: The key of the ID column in the :class:`pandas.DataFrame`. Default is ``'ID'``. :type id_key: str, optional :param time_key: The key of the time column in the :class:`pandas.DataFrame`. Default is ``'Time'``. :type time_key: str, optional :param obs_key: The key of the observable column in the :class:`pandas.DataFrame`. Default is ``'Observable'``. :type obs_key: str, optional :param value_key: The key of the value column in the :class:`pandas.DataFrame`. Default is ``'Value'``. :type value_key: str, optional :param dose_key: The key of the dose column in the :class:`pandas.DataFrame`. Default is ``'Dose'``. :type dose_key: str, optional :param dose_duration_key: The key of the duration column in the :class:`pandas.DataFrame`. Default is ``'Duration'``. :type dose_duration_key: str, optional """ # Check input format if not isinstance(data, pd.DataFrame): raise TypeError( 'Data has to be a pandas.DataFrame.') # If model does not support dose administration, set dose keys to None if not self._mechanistic_model.supports_dosing(): dose_key = None dose_duration_key = None keys = [id_key, time_key, obs_key, value_key] if dose_key is not None: keys += [dose_key] if dose_duration_key is not None: keys += [dose_duration_key] for key in keys: if key not in data.keys(): raise ValueError( 'Data does not have the key <' + str(key) + '>.') # Check output observable map outputs = self._mechanistic_model.outputs() observables = data[obs_key].dropna().unique() output_observable_dict = self._check_output_observable_dict( output_observable_dict, outputs, observables) # Check covariate name map covariate_names = self.get_covariate_names() covariate_dict = self._check_covariate_dict( covariate_dict, covariate_names, observables) self._id_key, self._time_key, self._obs_key, self._value_key = [ id_key, time_key, obs_key, value_key] self._data = data[keys] self._output_observable_dict = output_observable_dict self._covariate_dict = covariate_dict # Make sure data is formatted correctly self._clean_data(dose_key, dose_duration_key) self._ids = self._data[self._id_key].unique() # Extract dosing regimens self._dosing_regimens = None if dose_key is not None: self._dosing_regimens = self._extract_dosing_regimens( dose_key, dose_duration_key) # Set number of modelled individuals of population model if isinstance(self._population_model, chi.ReducedPopulationModel): # Unfix model parameters self._population_model = \ self._population_model.get_population_model() if self._population_model is not None: self._population_model.set_n_ids(len(self._ids)) # Check that covariates can be reshaped into (n, c) self._check_covariate_values(covariate_names)
[docs] def set_log_prior(self, log_prior): """ Sets the prior distribution of the model parameters. The log-prior dimensions are assumed to be ordered according to :meth:`get_parameter_names`. .. note:: This method requires that the data has been set, since the number of parameters of an hierarchical model may vary with the number of individuals in the dataset (see e.g. :class:`HeterogeneousModel`). :param log_prior: A :class:`pints.LogPrior` of the length :meth:`get_n_parameters`. :type log_priors: pints.LogPrior """ # Check prerequesites if self._data is None: raise ValueError('The data has not been set.') # Check inputs if not isinstance(log_prior, pints.LogPrior): raise ValueError( 'The log-prior has to be an instance of pints.LogPrior.') if log_prior.n_parameters() != self.get_n_parameters(): raise ValueError( 'The dimension of the log-prior has to be the same as the ' 'number of parameters in the model. There are <' + str(self.get_n_parameters()) + '> model parameters.') self._log_prior = log_prior
[docs] def set_population_model(self, population_model): """ Sets the population model. A population model specifies how model parameters vary across individuals. The dimension of the population model has to match the number of model parameters. .. note:: Setting a population model resets the log-prior to ``None``, because it changes the top-level parameters of the model. :param population_model: A :class:`PopulationModel` whose dimension is the same as the number of bottom-level parameters. :type population_model: PopulationModel """ # Check inputs if not isinstance(population_model, chi.PopulationModel): raise TypeError( 'The population model has to be an instance of ' 'chi.PopulationModel.') # Make sure that dimension of population model is correct n_parameters = self.get_n_parameters(exclude_pop_model=True) if population_model.n_dim() != n_parameters: raise ValueError( 'The dimension of the population model does not match the ' 'number of bottom-level parameters. There are ' '<' + str(n_parameters) + '> bottom-level parameters.') # Remember population model self._population_model = population_model # Set number of modelled individuals, if data has been set already if self._data is not None: self._population_model.set_n_ids(len(self._ids)) # Set dimension names to bottom-level parameters names = self.get_parameter_names(exclude_pop_model=True) self._population_model.set_dim_names(names) # Check that covariates can be found, if data has already been set if self._data is not None: try: # Get covariate names covariate_names = self.get_covariate_names() observables = self._data[self._obs_key].dropna().unique() self._covariate_dict = self._check_covariate_dict( self._covariate_dict, covariate_names, observables) except ValueError: # New population model and data are not compatible, so reset # data self._data = None warn( 'The covariates of the new population model could not ' 'automatically be matched to the observables in the ' 'dataset. The data was therefore reset. Please set the ' 'data again with the `set_data` method and specify the ' 'covariate mapping.', UserWarning) # Set prior to default self._log_prior = None