#
# This file is part of the chi repository
# (https://github.com/DavAug/chi/) which is released under the
# BSD 3-clause license. See accompanying LICENSE.md for copyright notice and
# full license details.
#
import numpy as np
[docs]
class PopulationFilter(object):
r"""
A base class for filters.
A filter estimates the likelihood with which simulated
observations :math:`\tilde{y}_{sj}` come from the same distribution as the
measurements :math:`y_{ij}`, where :math:`s` indexes simulated individuals,
:math:`j` time points and :math:`i` measured individuals.
Formally the log-likelihood of the simulated observations with respect to
the filter is defined as
.. math::
\log p(Y | \tilde{Y}) = \sum _{ij} \log p(y_{ij} | \tilde{Y}_j ),
where :math:`\tilde{Y}_j = \{ \tilde{y}_{sj} \}` are the simulated
observations at time point :math:`t_j`.
The measurements are expected to be arranged into a 3 dimensional numpy
array of shape ``(n_ids, n_observables, n_times)``, where
``n_ids`` is the number of measured individuals at a given time point,
``n_observables`` is the number of unique observables that were
measurement, and ``n_times`` is the number of unique time points.
The filter expects the simulated measurements to be ordered in
the same way, so no record of the measurement times is needed.
If varying numbers of individuals were measured at different time points,
or not all observables were measured for each individual, the missing
values can be filled with ``np.nan`` to be able to shape the observations
into the format ``(n_ids, n_observables, n_times)``. The missing values
will be filtered internally.
:param observations: Measurements.
:type observations: np.ndarray of shape
``(n_ids, n_observables, n_times)``
"""
def __init__(self, observations):
observations = np.asarray(observations)
if observations.ndim != 3:
raise ValueError(
'The observations must be of shape '
'(n_ids, n_observables, n_times).')
# Transform into masked array
mask = np.isnan(observations)
if np.any(mask):
observations = np.ma.array(observations, mask=mask)
self._observations = observations
_, self._n_observables, self._n_times = observations.shape
[docs]
def compute_log_likelihood(self, simulated_obs):
"""
Returns the log-likelihood of the simulated observations with respect
to the data and filter.
:param simulated_obs: Simulated measurements.
:type simulated_obs: np.ndarray of shape
(n_sim, n_observables, n_times)
:rtype: float
"""
raise NotImplementedError
[docs]
def compute_sensitivities(self, simulated_obs):
"""
Returns the log-likelihood of the simulated observations with respect
to the data and filter, and the sensitivities of the log-likelihood
with respect to the simulated observations.
:param simulated_obs: Simulated measurements.
:type simulated_obs: np.ndarray of shape
(n_sim, n_observables, n_times)
:rtype: Tuple[float, np.ndarray] where the array has shape
``(n_sim, n_observables, n_times)``
"""
raise NotImplementedError
[docs]
def n_observables(self):
"""
Returns the number of observables in the dataset.
"""
return self._n_observables
[docs]
def n_times(self):
"""
Returns the number of measurement times in the dataset.
"""
return self._n_times
[docs]
def sort_times(self, order):
"""
Sorts the observations along the time dimension according to the
provided indices.
:param order: An array with indices that orders the observations along
thetime dimension.
:type order: np.ndarray of shape ``(n_times,)``
"""
order = np.asarray(order)
if len(order) != self._n_times:
raise ValueError('Order has to be of length n_times.')
if len(order) != len(np.unique(order)):
raise ValueError('Order has to contain n_times unique elements.')
self._observations = self._observations[..., order]
[docs]
class ComposedPopulationFilter(PopulationFilter):
r"""
A filter composed of multiple filters.
A composed filter takes a list of filters and defines
the log-likelihood of simulated measurements as the sum over the individual
log-likelihoods of the filters
.. math::
\log p(Y | \tilde{Y}) = \sum _{ij} \log p_j(y_{ij} | \tilde{Y}_j ),
where :math:`\tilde{Y}_j = \{ \tilde{y}_{sj} \}` are the simulated
measurements and :math:`p_j(\cdot | \tilde{Y}_j )` is the filter
at time point :math:`t_j`.
The input instances of :class:`chi.PopulationFilter` may model
multiple time points at once. The measurement times are expected to be
ordered according to the concatenated measurement times of the individual
filters.
Extends :class:`chi.PopulationFilter`.
:param population_filters: List of filters.
:type population_filters: List[chi.PopulationFilter]
"""
def __init__(self, population_filters):
# Check inputs
for population_filter in population_filters:
if not isinstance(population_filter, PopulationFilter):
raise TypeError(
'All filters have to be instances '
'of chi.PopulationFilter.')
# TODO: Enforce that all filters model the same number of
# observables. In future PRs we can think about how to relax this
# constraint (might be nice to model different observables at the same
# time with different filters).
n_observables = population_filters[0].n_observables()
for population_filter in population_filters:
if population_filter.n_observables() != n_observables:
raise ValueError(
'All filters need to model the same number of '
'observables.')
self._population_filters = population_filters
self._n_observables = n_observables
# Get properties
self._n_times = np.sum([
pop_filter.n_times() for pop_filter in self._population_filters])
# Defaults
self._time_order = None
self._time_filter_order = None
[docs]
def compute_log_likelihood(self, simulated_obs):
"""
Returns the log-likelihood of the simulated observations with respect
to the data and filter.
:param simulated_obs: Simulated measurements.
:type simulated_obs: np.ndarray of shape
(n_sim, n_observables, n_times)
:rtype: float
"""
# Sort simulated observations
if self._time_order is not None:
simulated_obs = simulated_obs[:, :, self._time_filter_order]
# Compute score
score = 0
current_time_id = 0
for pop_filter in self._population_filters:
end_time_id = current_time_id + pop_filter.n_times()
score += pop_filter.compute_log_likelihood(
simulated_obs[:, :, current_time_id:end_time_id])
current_time_id = end_time_id
return score
[docs]
def compute_sensitivities(self, simulated_obs):
"""
Returns the log-likelihood of the simulated observations with respect
to the data and filter, and the sensitivities of the log-likelihood
with respect to the simulated observations.
:param simulated_obs: Simulated measurements.
:type simulated_obs: np.ndarray of shape
(n_sim, n_observables, n_times)
:rtype: Tuple[float, np.ndarray] where the array has shape
``(n_sim, n_observables, n_times)``
"""
# Sort simulated observations
if self._time_order is not None:
simulated_obs = simulated_obs[:, :, self._time_filter_order]
# Compute score
score = 0
sensitivities = np.zeros(shape=simulated_obs.shape)
current_time_id = 0
for pop_filter in self._population_filters:
end_time_id = current_time_id + pop_filter.n_times()
s, sens = pop_filter.compute_sensitivities(
simulated_obs[:, :, current_time_id:end_time_id])
score += s
sensitivities[:, :, current_time_id:end_time_id] = sens
current_time_id = end_time_id
# Sort sensitivities into input order
if self._time_order is not None:
sensitivities = sensitivities[:, :, self._time_order]
return score, sensitivities
[docs]
def n_observables(self):
"""
Returns the number of observables in the dataset.
"""
return self._n_observables
[docs]
def n_times(self):
"""
Returns the number of measurement times in the dataset.
"""
return self._n_times
[docs]
def sort_times(self, order):
"""
Sorts the observations along the time dimension according to the
provided indices.
:param order: An array with indices that orders the observations along
thetime dimension.
:type order: np.ndarray of shape ``(n_times,)``
"""
order = np.asarray(order)
if len(order) != self._n_times:
raise ValueError('Order has to be of length n_times.')
if len(order) != len(np.unique(order)):
raise ValueError('Order has to contain n_times unique elements.')
# If order doesn't change, ignore input
# (This is purely for efficiency of log-likelihood computation)
if np.all(order == np.arange(self._n_times)):
return None
# Cannot sort observations directly
# (compartmentalised into individual filters)
# So let us remember the order and order simulated measurements
# during inference
self._time_order = np.copy(order)
self._time_filter_order = np.argsort(self._time_order)
[docs]
class GaussianFilter(PopulationFilter):
r"""
Implements a Gaussian filter.
A Gaussian filter approximates the distribution of
measurements at time point :math:`t_j` by a Gaussian distribution
whose mean and variance are estimated from simulated
measurements
.. math::
\log p(\mathcal{D} | \tilde{Y}) =
\sum _{ij} \log \mathcal{N} (y_{ij} | \mu _j, \sigma ^2_j),
where the mean :math:`\mu _j` and the variance :math:`\sigma ^2_j` are
given by empirical estimates from the simulated measurements
.. math::
\mu _j = \frac{1}{n_s} \sum _{s=1}^{n_s} \tilde{y}_{sj}
\quad \text{and} \quad
\sigma ^2 _j = \frac{1}{n_s-1} \sum _{s=1}^{n_s} \left(
\tilde{y}_{sj} - \mu _j \right) ^2.
Here, we use :math:`i` to index measured individuals from the dataset,
:math:`j` to index measurement time points and :math:`s` to index simulated
measurements. :math:`n_s` denotes the number of simulated measurements per
time point.
For multiple measured observables the above expression can be
straightforwardly extended to
.. math::
\log p(\mathcal{D} | \tilde{Y}) =
\sum _{ijr} \log \mathcal{N} (y_{ijr} | \mu _{jr}, \sigma ^2_{jr}),
where :math:`r` indexes observables and :math:`\mu _{jr}` and
:math:`\sigma^2 _{jr}` are the empirical mean and variance over the
simulated measurements of the observable :math:`r` at time point
:math:`t_j`.
Extends :class:`PopulationFilter`
:param observations: Measurements.
:type observations: np.ndarray of shape
``(n_ids, n_observables, n_times)``
"""
def __init__(self, observations):
super().__init__(observations)
def _compute_log_likelihood(self, mu, var):
"""
Returns the log-likelihood.
mu of shape (1, n_observables, n_times)
var of shape (1, n_observables, n_times)
"""
score = -np.sum(
np.log(2*np.pi) + np.log(var)
+ (self._observations - mu)**2 / var) / 2
return score
[docs]
def compute_log_likelihood(self, simulated_obs):
"""
Returns the log-likelihood of the simulated observations with respect
to the data and filter.
:param simulated_obs: Simulated measurements.
:type simulated_obs: np.ndarray of shape
(n_sim, n_observables, n_times)
:rtype: float
"""
mu = np.mean(simulated_obs, axis=0, keepdims=True)
var = np.var(simulated_obs, ddof=1, axis=0, keepdims=True)
score = self._compute_log_likelihood(mu, var)
if np.ma.is_masked(score):
return -np.inf
return score
[docs]
def compute_sensitivities(self, simulated_obs):
"""
Returns the log-likelihood of the simulated observations with respect
to the data and filter, and the sensitivities of the log-likelihood
with respect to the simulated observations.
:param simulated_obs: Simulated measurements.
:type simulated_obs: np.ndarray of shape
(n_sim, n_observables, n_times)
:rtype: Tuple[float, np.ndarray] where the array has shape
``(n_sim, n_observables, n_times)``
"""
mu = np.mean(simulated_obs, axis=0, keepdims=True)
var = np.var(simulated_obs, ddof=1, axis=0, keepdims=True)
score = self._compute_log_likelihood(mu, var)
if np.ma.is_masked(score):
return -np.inf, np.empty(simulated_obs.shape)
# Compute sensitivities
# dscore/dsim = dscore/dmu dmu/dsim + dscore/dvar dvar/dsim
n_sim = len(simulated_obs)
dscore_dsim = \
np.sum((self._observations - mu) / var, axis=0) / n_sim \
+ np.sum(
- 1 / var + (self._observations - mu)**2 / var**2,
axis=0) * (simulated_obs - mu) / (n_sim - 1)
return score, dscore_dsim
[docs]
class GaussianKDEFilter(PopulationFilter):
r"""
Implements a Gaussian kernel density estimation filter.
A Gaussian KDE filter approximates the distribution of
measurements at time point :math:`t_j` by a Gaussian KDE
approximation of the simulated measurements. The Gaussian KDE approximation
is defined by the average over Gaussian probability densities whose means
are equal to the simulated measurements and the standard deviation (or
bandwidth) is a hyperparameter. By default the bandwidth is chosen by
an adapted rule of thumb.
The log-likelihood of the simulated measurements with respect to the
measurements and the filter is defined as
.. math::
\log p(\mathcal{D} | \tilde{Y}) =
\sum _{ij} \log \left( \frac{1}{n_s} \sum _{s=1}^{n_s}
\mathcal{N} (y_{ij} | \tilde{y}_{sj}, \tilde{\sigma} ^2_j) \right).
Here, we use :math:`i` to index measured individuals from the dataset,
:math:`j` to index measurement time points and :math:`s` to index simulated
measurements. :math:`n_s` denotes the number of simulated measurements per
time point.
An adapted rule of thumb is used to estimate an
appropriate bandwidth for each time point :math:`t_j`
.. math::
\tilde{\sigma} _j =
\left( \frac{4}{3n_s}\right) ^ {1/5}
\sqrt{\frac{1}{n_s - 1}
\sum _s (\tilde{y}_{sj} - \tilde{\mu} _j)^2},
where :math:`\tilde{\mu} _j = \sum _s \tilde{y}_{sj} / n_s` is the
empirical mean over the simulated measurements at time
:math:`t_j`.
For multiple measured observables the above expression can be
straightforwardly extended to
.. math::
\log p(\mathcal{D} | \tilde{Y}) =
\sum _{ijr} \log \left( \frac{1}{n_s} \sum _{s=1}^{n_s}
\mathcal{N} (y_{ijr} | \tilde{y}_{sjr}, \tilde{\sigma} ^2_{jr})
\right),
where :math:`r` indexes observables and
:math:`\tilde{\sigma} _{jr}` is the bandwidth for observable :math:`r` at
time point :math:`t_j`.
Extends :class:`PopulationFilter`
:param observations: Measurements.
:type observations: np.ndarray of shape
``(n_ids, n_observables, n_times)``
"""
def __init__(self, observations):
super().__init__(observations)
# Add dummy dimension to observations for later convenience
self._observations = self._observations[np.newaxis, ...]
[docs]
def compute_log_likelihood(self, simulated_obs):
"""
Returns the log-likelihood of the simulated observations with respect
to the data and filter.
:param simulated_obs: Simulated measurements.
:type simulated_obs: np.ndarray of shape
(n_sim, n_observables, n_times)
:rtype: float
"""
n_sim = len(simulated_obs)
simulated_obs = np.asarray(simulated_obs)[:, np.newaxis, :, :]
bw_squared = (4 / 3 / n_sim) ** 0.4 * np.var(
simulated_obs, ddof=1, axis=0, keepdims=True)
score = np.sum(logsumexp(
- (simulated_obs - self._observations)**2
/ bw_squared / 2, axis=0
) - np.log(n_sim) - np.log(2 * np.pi) / 2 - np.log(bw_squared) / 2)
if np.ma.is_masked(score):
return -np.inf
return score
[docs]
def compute_sensitivities(self, simulated_obs):
"""
Returns the log-likelihood of the simulated observations with respect
to the data and filter, and the sensitivities of the log-likelihood
with respect to the simulated observations.
:param simulated_obs: Simulated measurements.
:type simulated_obs: np.ndarray of shape
(n_sim, n_observables, n_times)
:rtype: Tuple[float, np.ndarray] where the array has shape
``(n_sim, n_observables, n_times)``
"""
n_sim = len(simulated_obs)
simulated_obs = np.asarray(simulated_obs)[:, np.newaxis, :, :]
mean = np.mean(simulated_obs, axis=0, keepdims=True)
var = np.var(simulated_obs, ddof=1, axis=0, keepdims=True)
bw_squared = (4 / 3 / n_sim) ** 0.4 * var
# Compute log-likelihood
scores = \
- (simulated_obs - self._observations)**2 / bw_squared / 2
score = np.sum(
logsumexp(scores, axis=0) - np.log(n_sim) - np.log(2 * np.pi) / 2
- np.log(bw_squared) / 2)
if np.ma.is_masked(score):
n_sim, _, n_obs, n_times = simulated_obs.shape
return -np.inf, np.empty((n_sim, n_obs, n_times))
# Compute sensitivities
# score = log mean exp scores - log bw + constant
# dscore/dsim =
# exp(scores) / sum(exp(scores)) * dscores / dsim
# - 1 / bw^2 / 2 * dbw^2 / dsim
dbw_squared_dsim_by_bw_squared = \
2 * (simulated_obs - mean) / (n_sim - 1) / var
dscore_dsim = np.sum(
softmax(scores, axis=0)
* (self._observations - simulated_obs) / bw_squared
- np.sum(softmax(scores, axis=0) * scores, axis=0, keepdims=True)
* dbw_squared_dsim_by_bw_squared
- dbw_squared_dsim_by_bw_squared / 2, axis=1)
return score, dscore_dsim
[docs]
class GaussianMixtureFilter(PopulationFilter):
r"""
Implements a Gaussian mixture filter.
A Gaussian mixture filter approximates the distribution of
measurements at time point :math:`t_j` by a Gaussian mixture distribution
whose kernel means and variances are estimated from simulated
measurements
.. math::
\log p(\mathcal{D} | \tilde{Y}) =
\sum _{ij} \log \sum_m \frac{1}{M}
\mathcal{N} (y_{ij} | \mu _{jm}, \sigma ^2_{jm}),
where the mean :math:`\mu _{jm}` and the variance :math:`\sigma ^2_{jm}`
of the mth Gaussian distribution are given by the empirical estimates from
the mth subset of the simulated measurements
.. math::
\mu _{jm} = \frac{1}{n} \sum _{s=1}^{n} \tilde{y}_{sjm}
\quad \text{and} \quad
\sigma ^2 _{jm} = \frac{1}{n-1} \sum _{s=1}^{n} \left(
\tilde{y}_{sjm} - \mu _{jm} \right) ^2.
Here, we use :math:`i` to index measured individuals from the dataset,
:math:`j` to index measurement time points and :math:`s` to index simulated
measurements. :math:`n` denotes the number of simulated measurements per
time point.
For multiple measured observables the above expression can be
straightforwardly extended to
.. math::
\log p(\mathcal{D} | \tilde{Y}) =
\sum _{ijr} \log \sum_m \frac{1}{M}
\mathcal{N} (y_{ijr} | \mu _{jrm}, \sigma ^2_{jrm}),
where :math:`r` indexes observables and :math:`\mu _{jrm}` and
:math:`\sigma^2 _{jrm}` are the empirical mean and variance over the
mth subset of simulated measurements of the observable :math:`r` at time
point :math:`t_j`.
Extends :class:`PopulationFilter`
:param observations: Measurements.
:type observations: np.ndarray of shape
``(n_ids, n_observables, n_times)``
"""
def __init__(self, observations, n_kernels=2):
super().__init__(observations)
n_kernels = int(n_kernels)
if n_kernels < 2:
raise ValueError(
'Invalid number of kernels. A Gaussian mixture filter expects '
'at least 2 kernels.')
self._n_kernels = n_kernels
# Add dummy dimension to observations for later convenience
self._observations = self._observations[
np.newaxis, np.newaxis, :, :, :]
def _compute_log_likelihood(self, mu, var):
"""
Returns the log-likelihood.
mu of shape (n_kernels, 1, 1, n_observables, n_times)
var of shape (n_kernels, 1, 1, n_observables, n_times)
"""
score = np.sum(logsumexp(
- (mu - self._observations)**2 / var / 2 - np.log(var) / 2, axis=0)
- np.log(self._n_kernels) - np.log(2 * np.pi) / 2)
if np.ma.is_masked(score):
return -np.inf
return score
[docs]
def compute_log_likelihood(self, simulated_obs):
"""
Returns the log-likelihood of the simulated observations with respect
to the data and filter.
:param simulated_obs: Simulated measurements.
:type simulated_obs: np.ndarray of shape
(n_sim, n_observables, n_times)
:rtype: float
"""
if len(simulated_obs) % self._n_kernels > 0:
raise ValueError(
'Invalid simulated_obs. The number of simulated observations '
'needs to be a multiple of the number of kernels.')
# Compute means and variances
n_sim, n_obs, n_times = simulated_obs.shape
n_per_kernel = n_sim // self._n_kernels
simulated_obs = simulated_obs.reshape(
self._n_kernels, n_per_kernel, 1, n_obs, n_times)
mu = np.mean(simulated_obs, axis=1, keepdims=True)
var = np.var(simulated_obs, ddof=1, axis=1, keepdims=True)
score = self._compute_log_likelihood(mu, var)
return score
[docs]
def compute_sensitivities(self, simulated_obs):
"""
Returns the log-likelihood of the simulated observations with respect
to the data and filter, and the sensitivities of the log-likelihood
with respect to the simulated observations.
:param simulated_obs: Simulated measurements.
:type simulated_obs: np.ndarray of shape
(n_sim, n_observables, n_times)
:rtype: Tuple[float, np.ndarray] where the array has shape
``(n_sim, n_observables, n_times)``
"""
if len(simulated_obs) % self._n_kernels > 0:
raise ValueError(
'Invalid simulated_obs. The number of simulated observations '
'needs to be a multiple of the number of kernels.')
# Compute means and variances
n_sim, n_obs, n_times = simulated_obs.shape
n_per_kernel = n_sim // self._n_kernels
simulated_obs = simulated_obs.reshape(
self._n_kernels, n_per_kernel, 1, n_obs, n_times)
mu = np.mean(simulated_obs, axis=1, keepdims=True)
var = np.var(simulated_obs, ddof=1, axis=1, keepdims=True)
# Compute log-likelihood
scores = - (mu - self._observations)**2 / var / 2 - np.log(var) / 2
score = np.sum(
logsumexp(scores, axis=0) - np.log(self._n_kernels)
- np.log(2 * np.pi) / 2)
if np.ma.is_masked(score):
return -np.inf, np.empty((n_sim, n_obs, n_times))
# Compute sensitivities
# score = log mean exp scores + constant
# dscore/dsim =
# exp(scores) / sum(exp(scores)) * dscores / dsim
dscores_dsim = \
(self._observations - mu) / var / n_per_kernel \
+ (- 1 / var + (self._observations - mu)**2 / var**2) \
* (simulated_obs - mu) / (n_per_kernel - 1)
dscore_dsim = np.sum(softmax(scores, axis=0) * dscores_dsim, axis=2)
dscore_dsim = dscore_dsim.reshape(n_sim, n_obs, n_times)
return score, dscore_dsim
[docs]
class LogNormalFilter(PopulationFilter):
r"""
Implements a lognormal filter.
A lognormal filter approximates the distribution of
measurements at time point :math:`t_j` by a lognormal distribution
whose location and scale are estimated from simulated
measurements
.. math::
\log p(\mathcal{D} | \tilde{Y}) =
\sum _{ij} \log \mathrm{LN} (y_{ij} | \mu _j, \sigma _j),
where the location :math:`\mu _j` and the location :math:`\sigma _j` are
given by empirical estimates of the log-mean and the log-standard deviation
of the simulated measurements
.. math::
\mu _j = \frac{1}{n_s} \sum _{s=1}^{n_s} \log \tilde{y}_{sj}
\quad \text{and} \quad
\sigma _j = \sqrt{\frac{1}{n_s-1} \sum _{s=1}^{n_s} \left(
\log \tilde{y}_{sj} - \mu _j \right) ^2}.
Here, we use :math:`i` to index measured individuals from the dataset,
:math:`j` to index measurement time points and :math:`s` to index simulated
measurements. :math:`n_s` denotes the number of simulated measurements per
time point.
For multiple measured observables the above expression can be
straightforwardly extended to
.. math::
\log p(\mathcal{D} | \tilde{Y}) =
\sum _{ijr} \log \mathrm{LN} (y_{ijr} | \mu _{jr}, \sigma _{jr}),
where :math:`r` indexes observables and :math:`\mu _{jr}` and
:math:`\sigma _{jr}` are the empirical mean and variance over the
simulated log-measurements of the observable :math:`r` at time point
:math:`t_j`.
Extends :class:`PopulationFilter`
:param observations: Measurements.
:type observations: np.ndarray of shape
``(n_ids, n_observables, n_times)``
"""
def __init__(self, observations):
super().__init__(observations)
# Log-transform the observations for later convenience
self._observations = np.log(self._observations)
def _compute_log_likelihood(self, mu, var):
"""
Returns the log-likelihood.
mu of shape (1, n_observables, n_times)
var of shape (1, n_observables, n_times)
"""
score = -np.sum(
np.log(2*np.pi) + np.log(var) + self._observations
+ (self._observations - mu)**2 / var) / 2
return score
[docs]
def compute_log_likelihood(self, simulated_obs):
"""
Returns the log-likelihood of the simulated observations with respect
to the data and filter.
:param simulated_obs: Simulated measurements.
:type simulated_obs: np.ndarray of shape
(n_sim, n_observables, n_times)
:rtype: float
"""
simulated_obs = np.log(simulated_obs)
mu = np.mean(simulated_obs, axis=0, keepdims=True)
var = np.var(simulated_obs, ddof=1, axis=0, keepdims=True)
score = self._compute_log_likelihood(mu, var)
if np.ma.is_masked(score):
return -np.inf
return score
[docs]
def compute_sensitivities(self, simulated_obs):
"""
Returns the log-likelihood of the simulated observations with respect
to the data and filter, and the sensitivities of the log-likelihood
with respect to the simulated observations.
:param simulated_obs: Simulated measurements.
:type simulated_obs: np.ndarray of shape
(n_sim, n_observables, n_times)
:rtype: Tuple[float, np.ndarray] where the array has shape
``(n_sim, n_observables, n_times)``
"""
simulated_obs = np.asarray(simulated_obs)
log_simulated_obs = np.log(simulated_obs)
mu = np.mean(log_simulated_obs, axis=0, keepdims=True)
var = np.var(log_simulated_obs, ddof=1, axis=0, keepdims=True)
score = self._compute_log_likelihood(mu, var)
if np.ma.is_masked(score):
return -np.inf, np.empty(simulated_obs.shape)
# Compute sensitivities
# dscore/dsim = dscore/dmu dmu/dsim + dscore/dvar dvar/dsim
n_sim = len(simulated_obs)
dscore_dsim = (
np.sum((self._observations - mu) / var, axis=0) / n_sim
+ np.sum(
(self._observations - mu)**2 / var**2 - 1 / var, axis=0
) / (n_sim - 1) * ((log_simulated_obs - mu) - np.mean(
log_simulated_obs - mu, axis=0))
) / simulated_obs
return score, dscore_dsim
[docs]
class LogNormalKDEFilter(PopulationFilter):
r"""
Implements a lognormal kernel density estimation filter.
A lognormal KDE filter approximates the distribution of
measurements at time point :math:`t_j` by a lognormal KDE
approximation of the simulated measurements. The lognormal KDE
approximation is defined by the average over lognormal probability
densities whose locations are equal to the simulated measurements and the
scale (or bandwidth) is a hyperparameter. By default the bandwidth is
chosen by an adapted rule of thumb.
The log-likelihood of the simulated measurements with respect to the
measurements and the filter is defined as
.. math::
\log p(\mathcal{D} | \tilde{Y}) =
\sum _{ij} \log \left( \frac{1}{n_s} \sum _{s=1}^{n_s}
\mathrm{LN} (y_{ij} | \tilde{y}_{sj}, \sigma _j) \right).
Here, we use :math:`i` to index measured individuals from the dataset,
:math:`j` to index measurement time points and :math:`s` to index simulated
measurements. :math:`n_s` denotes the number of simulated measurements per
time point.
An adapted rule of thumb is used to estimate an
appropriate bandwidth for each time point :math:`t_j`
.. math::
\sigma _j =
\left( \frac{4}{3n_s}\right) ^ {1/5}
\sqrt{\frac{1}{n_j - 1}\sum _i (\log y_{ij} - \mu _j)^2},
where :math:`\mu _j = \sum _i \log y_{ij} / n_j` is the empirical mean
over the log-measurements and :math:`n_j` is the number of measurements at
time :math:`t_j`. Note that this deviates from the standard definition of
the rule of thumb, where the empirical variance would be estimated from the
simulated measurements.
For multiple measured observables the above expression can be
straightforwardly extended to
.. math::
\log p(\mathcal{D} | \tilde{Y}) =
\sum _{ijr} \log \left( \frac{1}{n_s} \sum _{s=1}^{n_s}
\mathrm{LN} (y_{ijr} | \tilde{y}_{sjr}, \sigma _{jr}) \right),
where :math:`r` indexes observables and
:math:`\sigma _{jr}` is the bandwidth for observable :math:`r` at time
point :math:`t_j`.
Extends :class:`PopulationFilter`
:param observations: Measurements.
:type observations: np.ndarray of shape
``(n_ids, n_observables, n_times)``
"""
def __init__(self, observations, bandwidth=None):
super().__init__(observations)
# Log-transform and reshape for later convenience
self._observations = np.log(self._observations)[np.newaxis, ...]
[docs]
def compute_log_likelihood(self, simulated_obs):
"""
Returns the log-likelihood of the simulated observations with respect
to the data and filter.
:param simulated_obs: Simulated measurements.
:type simulated_obs: np.ndarray of shape
(n_sim, n_observables, n_times)
:rtype: float
"""
n_sim = len(simulated_obs)
simulated_obs = np.log(np.asarray(simulated_obs))[:, np.newaxis, :, :]
bw_squared = (4 / 3 / n_sim) ** 0.4 * np.var(
simulated_obs, ddof=1, axis=0, keepdims=True)
score = np.sum(logsumexp(
- (simulated_obs - self._observations)**2
/ bw_squared / 2, axis=0
) - np.log(n_sim) - np.log(2 * np.pi) / 2 - np.log(bw_squared) / 2)
if np.ma.is_masked(score):
return -np.inf
return score
[docs]
def compute_sensitivities(self, simulated_obs):
"""
Returns the log-likelihood of the simulated observations with respect
to the data and filter, and the sensitivities of the log-likelihood
with respect to the simulated observations.
:param simulated_obs: Simulated measurements.
:type simulated_obs: np.ndarray of shape
(n_sim, n_observables, n_times)
:rtype: Tuple[float, np.ndarray] where the array has shape
``(n_sim, n_observables, n_times)``
"""
n_sim = len(simulated_obs)
simulated_obs = np.asarray(simulated_obs)[:, np.newaxis, :, :]
log_simulated_obs = np.log(simulated_obs)
mean = np.mean(log_simulated_obs, axis=0, keepdims=True)
var = np.var(log_simulated_obs, ddof=1, axis=0, keepdims=True)
bw_squared = (4 / 3 / n_sim) ** 0.4 * var
# Compute log-likelihood
scores = \
- (log_simulated_obs - self._observations)**2 \
/ bw_squared / 2
score = np.sum(
logsumexp(scores, axis=0) - np.log(n_sim) - np.log(2 * np.pi) / 2
- np.log(bw_squared) / 2)
if np.ma.is_masked(score) or np.isnan(score):
n_sim, _, n_obs, n_times = simulated_obs.shape
return -np.inf, np.empty((n_sim, n_obs, n_times))
# Compute sensitivities
# score = log mean exp scores - log bw + constant
# dscore/dsim =
# exp(scores) / sum(exp(scores)) * dscores / dsim
# - 1 / bw^2 / 2 * dbw^2 / dsim
dbw_squared_dsim_by_bw_squared = \
2 * (log_simulated_obs - mean) / (n_sim - 1) / var / simulated_obs
dscore_dsim = np.sum(
softmax(scores, axis=0)
* (self._observations - log_simulated_obs) / bw_squared
/ simulated_obs
- np.sum(softmax(scores, axis=0) * scores, axis=0, keepdims=True)
* dbw_squared_dsim_by_bw_squared
- dbw_squared_dsim_by_bw_squared / 2, axis=1)
return score, dscore_dsim
def logsumexp(a, axis=None, keepdims=False): # pragma: no cover
"""
Returns log of the sum of the exponentiated values of a.
Code is copied from scipy.special.logsumexp and adapted to work for masked
arrays.
"""
a_max = np.max(a, axis=axis, keepdims=True)
if a_max.ndim > 0:
a_max[~np.isfinite(a_max)] = 0
elif not np.isfinite(a_max):
a_max = 0
tmp = np.exp(a - a_max)
# suppress warnings about log of zero
with np.errstate(divide='ignore'):
s = np.sum(tmp, axis=axis, keepdims=keepdims)
out = np.log(s)
# Add back shift
if not keepdims:
a_max = np.squeeze(a_max, axis=axis)
out += a_max
return out
def softmax(a, axis=None): # pragma: no cover
"""
Returns the softmax of a.
Code is copied from scipy.special.softmax and adapted to work for masked
arrays.
"""
return np.exp(a - logsumexp(a, axis=axis, keepdims=True))