"""
=================================
Risk Exposure Distribution Models
=================================
This module contains tools for modeling several different risk
exposure distributions.
"""
from __future__ import annotations
import warnings
from abc import ABC, abstractmethod
from typing import NamedTuple
import numpy as np
import pandas as pd
import risk_distributions as rd
from layered_config_tree import LayeredConfigTree
from vivarium import Component
from vivarium.framework.engine import Builder
from vivarium.framework.lookup import DEFAULT_VALUE_COLUMN, LookupTable
from vivarium.framework.population import SimulantData
from vivarium_public_health.causal_factor.calibration_constant import (
register_risk_affected_attribute_producer,
)
from vivarium_public_health.causal_factor.utilities import pivot_categorical
from vivarium_public_health.utilities import EntityString
[docs]
class MissingDataError(Exception):
"""Custom exception for missing data."""
pass
[docs]
class CausalFactorDistribution(Component, ABC):
"""Abstract base class for causal factor exposure distribution models.
Subclasses implement specific distribution types (e.g., continuous,
polytomous, dichotomous, ensemble) for modeling causal factor exposures
in a simulation.
"""
#####################
# Lifecycle methods #
#####################
def __init__(
self,
causal_factor: EntityString,
distribution_type: str,
exposure_data: int | float | pd.DataFrame | None = None,
) -> None:
"""
Parameters
----------
causal_factor
The entity string identifying the risk factor.
distribution_type
The type of distribution (e.g., ``"normal"``,
``"dichotomous"``).
exposure_data
Optional pre-loaded exposure data. If ``None``, data is
loaded from the simulation during setup.
"""
super().__init__()
self.causal_factor = causal_factor
self.distribution_type = distribution_type
self._exposure_data = exposure_data
self.causal_factor_propensity = f"{self.causal_factor.name}.propensity"
self.exposure_ppf_pipeline = f"{self.causal_factor.name}.exposure_distribution.ppf"
#################
# Setup methods #
#################
[docs]
def get_configuration(self, builder: "Builder") -> LayeredConfigTree | None:
"""Return the configuration tree for this causal factor.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
Returns
-------
The configuration sub-tree for this causal factor.
"""
return builder.configuration[self.causal_factor]
[docs]
def get_exposure_data(self, builder: Builder) -> int | float | pd.DataFrame:
"""Return exposure data (using pre-loaded data if available).
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
Returns
-------
The exposure data for this risk factor.
"""
if self._exposure_data is not None:
return self._exposure_data
return self.get_data(builder, self.configuration["data_sources"]["exposure"])
[docs]
def setup(self, builder: Builder) -> None:
"""Register the exposure PPF pipeline.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
"""
self.register_exposure_ppf_pipeline(builder)
[docs]
@abstractmethod
def register_exposure_ppf_pipeline(self, builder: Builder) -> None:
"""Register the exposure PPF pipeline with the simulation.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
"""
pass
[docs]
class EnsembleDistribution(CausalFactorDistribution):
"""Model risk exposure using an ensemble of weighted distributions.
Combine multiple parametric distributions (e.g., normal, log-normal,
gamma) weighted by GBD-derived weights to represent complex exposure
distributions.
"""
#####################
# Lifecycle methods #
#####################
def __init__(
self, causal_factor: EntityString, distribution_type: str = "ensemble"
) -> None:
"""
Parameters
----------
causal_factor
The entity string identifying the causal factor.
distribution_type
The distribution type label. Default is ``"ensemble"``.
"""
super().__init__(causal_factor, distribution_type)
self.ensemble_propensity = f"ensemble_propensity.{self.causal_factor}"
#################
# Setup methods #
#################
[docs]
def setup(self, builder: Builder) -> None:
"""Build distribution weight and parameter lookup tables.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
"""
distributions, weights, parameters = self.get_distribution_definitions(builder)
self.distribution_weights_table = self.build_lookup_table(
builder,
"exposure_distribution_weights",
data_source=weights,
value_columns=distributions,
)
self.parameters = {
parameter: self.build_lookup_table(
builder,
parameter,
data_source=data.reset_index(),
value_columns=list(data.columns),
)
for parameter, data in parameters.items()
}
super().setup(builder)
self.randomness = builder.randomness.get_stream(self.ensemble_propensity)
builder.population.register_initializer(
initializer=self.initialize_ensemble_propensity,
columns=self.ensemble_propensity,
required_resources=[self.randomness],
)
[docs]
def get_distribution_definitions(
self, builder: Builder
) -> tuple[list[str], pd.DataFrame, dict[str, pd.DataFrame]]:
"""Load and compute ensemble distribution definitions.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
Returns
-------
A tuple of (distribution names, weights DataFrame,
parameter dict keyed by distribution name).
Raises
------
NotImplementedError
If the ``glnorm`` distribution has non-zero weights.
"""
exposure_data = self.get_exposure_data(builder)
standard_deviation = self.get_data(
builder,
self.configuration["data_sources"]["exposure_standard_deviation"],
)
weights_source = self.configuration["data_sources"]["ensemble_distribution_weights"]
raw_weights = self.get_data(builder, weights_source)
glnorm_mask = raw_weights["parameter"] == "glnorm"
if np.any(raw_weights.loc[glnorm_mask, DEFAULT_VALUE_COLUMN]):
raise NotImplementedError("glnorm distribution is not supported")
raw_weights = raw_weights[~glnorm_mask]
distributions = list(raw_weights["parameter"].unique())
raw_weights = pivot_categorical(
raw_weights, pivot_column="parameter", reset_index=False
)
weights, parameters = rd.EnsembleDistribution.get_parameters(
raw_weights,
mean=get_risk_distribution_parameter(exposure_data),
sd=get_risk_distribution_parameter(standard_deviation),
)
return distributions, weights.reset_index(), parameters
[docs]
def register_exposure_ppf_pipeline(self, builder: Builder) -> None:
"""Register the ensemble exposure PPF pipeline.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
"""
tables = [self.distribution_weights_table, *self.parameters.values()]
register_risk_affected_attribute_producer(
builder=builder,
name=self.exposure_ppf_pipeline,
source=self.exposure_ppf,
required_resources=[
*tables,
self.causal_factor_propensity,
self.ensemble_propensity,
],
)
########################
# Event-driven methods #
########################
[docs]
def initialize_ensemble_propensity(self, pop_data: SimulantData) -> None:
"""Initialize propensities for selecting child distributions in the ensemble.
Parameters
----------
pop_data
Metadata about the simulants being initialized.
"""
ensemble_propensity = self.randomness.get_draw(pop_data.index).rename(
self.ensemble_propensity
)
self.population_view.initialize(ensemble_propensity)
##################################
# Pipeline sources and modifiers #
##################################
[docs]
def exposure_ppf(self, index: pd.Index) -> pd.Series:
"""Calculate exposure values from propensities using the ensemble.
Parameters
----------
index
An index representing the simulants.
Returns
-------
A series of exposure values.
"""
pop = self.population_view.get(
index, [self.causal_factor_propensity, self.ensemble_propensity]
)
quantiles = pop[self.causal_factor_propensity]
if not pop.empty:
quantiles = clip(quantiles)
weights = self.distribution_weights_table(quantiles.index)
parameters = {
name: param(quantiles.index) for name, param in self.parameters.items()
}
x = rd.EnsembleDistribution(weights, parameters).ppf(
quantiles, pop[self.ensemble_propensity]
)
x[x.isnull()] = 0
else:
x = pd.Series([])
return x
[docs]
class ContinuousDistribution(CausalFactorDistribution):
"""Model risk exposure using a continuous parametric distribution.
Support ``"normal"`` and ``"lognormal"`` distribution types. Exposure
values are derived from the distribution's percent-point function
evaluated at each simulant's propensity.
"""
#####################
# Lifecycle methods #
#####################
def __init__(self, causal_factor: EntityString, distribution_type: str) -> None:
"""
Parameters
----------
causal_factor
The entity string identifying the causal factor.
distribution_type
The distribution type (``"normal"`` or ``"lognormal"``).
Raises
------
NotImplementedError
If the distribution type is not ``"normal"`` or
``"lognormal"``.
"""
super().__init__(causal_factor, distribution_type)
self.exposure_params_name = f"{self.causal_factor}.exposure_parameters"
self.standard_deviation = None
try:
self._distribution = {
"normal": rd.Normal,
"lognormal": rd.LogNormal,
}[distribution_type]
except KeyError:
raise NotImplementedError(
f"Distribution type {distribution_type} is not supported for "
f"causal_factor {self.causal_factor.name}."
)
#################
# Setup methods #
#################
[docs]
def setup(self, builder):
"""Compute distribution parameters and register pipelines.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
"""
parameters = self.get_distribution_parameters(builder)
self.parameters_table = self.build_lookup_table(
builder,
"exposure_parameters",
data_source=parameters.reset_index(),
value_columns=list(parameters.columns),
)
self.register_exposure_params_pipeline(builder)
super().setup(builder)
[docs]
def get_distribution_parameters(self, builder: "Builder") -> None:
"""Compute the distribution parameters from exposure data.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
Returns
-------
A DataFrame of distribution parameters (e.g., loc, scale).
"""
exposure_data = self.get_exposure_data(builder)
standard_deviation = self.get_data(
builder, self.configuration["data_sources"]["exposure_standard_deviation"]
)
return self._distribution.get_parameters(
mean=get_risk_distribution_parameter(exposure_data),
sd=get_risk_distribution_parameter(standard_deviation),
)
[docs]
def register_exposure_ppf_pipeline(self, builder: Builder) -> None:
"""Register the continuous exposure PPF pipeline.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
"""
register_risk_affected_attribute_producer(
builder=builder,
name=self.exposure_ppf_pipeline,
source=self.exposure_ppf,
required_resources=[self.exposure_params_name, self.causal_factor_propensity],
)
[docs]
def register_exposure_params_pipeline(self, builder: Builder) -> None:
"""Register the exposure parameters pipeline.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
"""
builder.value.register_attribute_producer(
self.exposure_params_name, source=self.parameters_table
)
##################################
# Pipeline sources and modifiers #
##################################
[docs]
def exposure_ppf(self, index: pd.Index) -> pd.Series:
"""Calculate exposure values from propensities.
Parameters
----------
index
An index representing the simulants.
Returns
-------
A series of exposure values.
"""
pop = self.population_view.get(
index, [self.causal_factor_propensity, self.exposure_params_name]
)
quantiles = pop[self.causal_factor_propensity]
if not quantiles.empty:
quantiles = clip(quantiles)
x = self._distribution(parameters=pop[self.exposure_params_name]).ppf(quantiles)
x[x.isnull()] = 0
else:
x = pd.Series([])
return x
[docs]
class PolytomousDistribution(CausalFactorDistribution):
"""Model risk exposure as a set of ordered or unordered categories.
Assign each simulant to a category by comparing their propensity
against the cumulative sum of category exposure probabilities.
"""
@property
def categories(self) -> list[str]:
"""The sorted list of exposure category names."""
# These need to be sorted so the cumulative sum is in the correct order of categories
# and results are therefore reproducible and correct
return sorted(self.exposure_params_table.value_columns)
#####################
# Lifecycle methods #
#####################
def __init__(
self,
causal_factor: EntityString,
distribution_type: str,
exposure_data: int | float | pd.DataFrame | None = None,
) -> None:
"""
Parameters
----------
causal_factor
The entity string identifying the causal factor.
distribution_type
The distribution type (e.g., ``"ordered_polytomous"``).
exposure_data
Optional pre-loaded exposure data.
"""
super().__init__(causal_factor, distribution_type, exposure_data)
self.exposure_params_pipeline = f"{self.causal_factor}.exposure_parameters"
#################
# Setup methods #
#################
[docs]
def setup(self, builder: Builder) -> None:
"""Build the exposure parameters table and register pipelines.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
"""
super().setup(builder)
self.exposure_params_table = self.build_exposure_params_table(builder)
self.register_exposure_params_pipeline(builder)
[docs]
def get_exposure_value_columns(
self, exposure_data: int | float | pd.DataFrame
) -> list[str] | None:
"""Extract unique category names from exposure data.
Parameters
----------
exposure_data
The exposure data, either as a scalar or a DataFrame.
Returns
-------
A list of category names if the data is a DataFrame, or
``None`` for scalar data.
"""
if isinstance(exposure_data, pd.DataFrame):
return list(exposure_data["parameter"].unique())
return None
[docs]
def register_exposure_ppf_pipeline(self, builder: Builder) -> None:
"""Register the polytomous exposure PPF pipeline.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
"""
builder.value.register_attribute_producer(
self.exposure_ppf_pipeline,
source=self.exposure_ppf,
required_resources=[self.exposure_params_pipeline, self.causal_factor_propensity],
)
[docs]
def register_exposure_params_pipeline(self, builder: Builder) -> None:
"""Register the exposure parameters pipeline.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
"""
builder.value.register_attribute_producer(
self.exposure_params_pipeline, source=self.exposure_params_table
)
[docs]
def build_exposure_params_table(self, builder: "Builder") -> LookupTable:
"""Build the lookup table for exposure parameters.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
Returns
-------
A lookup table for the exposure parameters.
"""
data = self.get_exposure_data(builder)
value_columns = self.get_exposure_value_columns(data)
if isinstance(data, pd.DataFrame):
data = pivot_categorical(data, "parameter")
return self.build_lookup_table(
builder, "exposure_parameters", data_source=data, value_columns=value_columns
)
##################################
# Pipeline sources and modifiers #
##################################
[docs]
def exposure_ppf(self, index: pd.Index) -> pd.Series:
"""Assign each simulant a category based on their propensity.
Parameters
----------
index
An index representing the simulants.
Returns
-------
A series of category labels for each simulant.
Raises
------
MissingDataError
If all exposure data sums to zero.
"""
pop = self.population_view.get(
index, [self.causal_factor_propensity, self.exposure_params_pipeline]
)
quantiles = pop[self.causal_factor_propensity]
sorted_exposures = pop[self.exposure_params_pipeline][self.categories]
if not np.allclose(1, np.sum(sorted_exposures, axis=1)):
raise MissingDataError("All exposure data returned as 0.")
exposure_sum = sorted_exposures.cumsum(axis="columns")
category_index = pd.concat(
[exposure_sum[c] < quantiles for c in exposure_sum.columns], axis=1
).sum(axis=1)
return pd.Series(
np.array(self.categories)[category_index],
name=self.causal_factor + ".exposure",
index=quantiles.index,
)
[docs]
class DichotomousDistribution(CausalFactorDistribution):
"""Model risk exposure as a two-category (exposed/unexposed) distribution.
Simulants with a propensity below the exposure probability are
assigned to the exposed category; otherwise the unexposed category.
Support optional rebinning of polytomous exposure data.
"""
@property
def exposed(self) -> str:
"""The name of the exposed category."""
return "covered" if self.causal_factor.type == "intervention" else "exposed"
@property
def unexposed(self) -> str:
"""The name of the unexposed category."""
return "uncovered" if self.causal_factor.type == "intervention" else "unexposed"
[docs]
def rename_deprecated_categories(self, data: pd.DataFrame) -> pd.DataFrame:
"""Rename deprecated cat1/cat2 parameter values to exposed/unexposed.
If the data contains ``'cat1'`` in its ``'parameter'`` column, the
values are replaced with the distribution's :attr:`exposed` and
:attr:`unexposed` names. A :class:`FutureWarning` is emitted for
non-intervention causal factors to signal that the old names will be
removed in a future release.
Parameters
----------
data
A DataFrame with a ``'parameter'`` column.
Returns
-------
The DataFrame with renamed parameter values (modified in place).
"""
if "cat1" not in data["parameter"].values:
return data
if self.causal_factor.type != "intervention":
warnings.warn(
"Using 'cat1' and 'cat2' for dichotomous exposure is deprecated "
"and will be removed in a future release. Use "
f"'{self.exposed}' and '{self.unexposed}' instead.",
FutureWarning,
stacklevel=3,
)
data["parameter"] = data["parameter"].replace(
{"cat1": self.exposed, "cat2": self.unexposed}
)
return data
#####################
# Lifecycle methods #
#####################
def __init__(
self,
causal_factor: EntityString,
distribution_type: str,
exposure_data: int | float | pd.DataFrame | None = None,
) -> None:
"""
Parameters
----------
causal_factor
The entity string identifying the causal factor.
distribution_type
The distribution type (``"dichotomous"``).
exposure_data
Optional pre-loaded exposure data.
"""
super().__init__(causal_factor, distribution_type, exposure_data)
self.exposure_params_name = f"{self.causal_factor}.exposure_parameters"
#################
# Setup methods #
#################
[docs]
def setup(self, builder: Builder) -> None:
"""Build the exposure table and register pipelines.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
"""
super().setup(builder)
self.exposure_table = self.build_exposure_table(builder)
self.register_exposure_params_pipeline(builder)
[docs]
def register_exposure_ppf_pipeline(self, builder: Builder) -> None:
"""Register the dichotomous exposure PPF pipeline.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
"""
builder.value.register_attribute_producer(
self.exposure_ppf_pipeline,
source=self.exposure_ppf,
required_resources=[self.exposure_params_name, self.causal_factor_propensity],
)
[docs]
def register_exposure_params_pipeline(self, builder: Builder) -> None:
"""Register the exposure parameters pipeline with calibration support.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
"""
register_risk_affected_attribute_producer(
builder=builder,
name=self.exposure_params_name,
source=self.exposure_parameter_source,
required_resources=[self.exposure_table],
)
[docs]
def build_exposure_table(self, builder: Builder) -> LookupTable[pd.Series]:
"""Build a lookup table for exposure data.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
Returns
-------
A lookup table for the exposure data.
Raises
------
ValueError
If any exposure values are outside the range [0, 1].
"""
data = self.get_exposure_data(builder)
if isinstance(data, pd.DataFrame):
any_negatives = (data[DEFAULT_VALUE_COLUMN] < 0).any().any()
any_over_one = (data[DEFAULT_VALUE_COLUMN] > 1).any().any()
if any_negatives or any_over_one:
raise ValueError(
f"All exposures must be in the range [0, 1] for {self.causal_factor}"
)
elif data < 0 or data > 1:
raise ValueError(f"Exposure must be in the range [0, 1] for {self.causal_factor}")
return self.build_lookup_table(builder, "exposure", data)
[docs]
def get_exposure_data(self, builder: Builder) -> int | float | pd.DataFrame:
"""Load and optionally rebin exposure data for the risk.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
Returns
-------
The (possibly rebinned) exposure data for the exposed
category.
"""
exposure_data = super().get_exposure_data(builder)
if isinstance(exposure_data, (int, float)):
return exposure_data
# rebin exposure categories
self.validate_rebin_source(builder, exposure_data)
rebin_exposed_categories = set(self.configuration["rebinned_exposed"])
exposure_data = self.rename_deprecated_categories(exposure_data)
if rebin_exposed_categories:
exposure_data = self._rebin_exposure_data(exposure_data, rebin_exposed_categories)
exposure_data = exposure_data[exposure_data["parameter"] == self.exposed]
return exposure_data.drop(columns="parameter")
def _rebin_exposure_data(
self, exposure_data: pd.DataFrame, rebin_exposed_categories: set
) -> pd.DataFrame:
"""Aggregate exposure categories into a single exposed category."""
exposure_data = exposure_data[
exposure_data["parameter"].isin(rebin_exposed_categories)
]
exposure_data["parameter"] = self.exposed
exposure_data = (
exposure_data.groupby(list(exposure_data.columns.difference(["value"])))
.sum()
.reset_index()
)
return exposure_data
##############
# Validators #
##############
[docs]
def validate_rebin_source(self, builder, data: pd.DataFrame) -> None:
"""Validate that rebinning configuration is consistent with the data.
Parameters
----------
builder
Access point for utilizing framework interfaces during setup.
data
The exposure data to validate against.
Raises
------
ValueError
If rebinning and category thresholds are both specified,
if any rebin categories are not found in the data, or if
all categories are in the rebin set.
"""
if not isinstance(data, pd.DataFrame):
return
rebin_exposed_categories = set(
builder.configuration[self.causal_factor]["rebinned_exposed"]
)
if (
rebin_exposed_categories
and builder.configuration[self.causal_factor]["category_thresholds"]
):
raise ValueError(
f"Rebinning and category thresholds are mutually exclusive. "
f"You provided both for {self.causal_factor.name}."
)
invalid_cats = rebin_exposed_categories.difference(set(data.parameter))
if invalid_cats:
raise ValueError(
f"The following provided categories for the rebinned exposed "
f"category of {self.causal_factor.name} are not found in the exposure data: "
f"{invalid_cats}."
)
if rebin_exposed_categories == set(data.parameter):
raise ValueError(
f"The provided categories for the rebinned exposed category of "
f"{self.causal_factor.name} comprise all categories for the exposure data. "
f"At least one category must be left out of the provided categories "
f"to be rebinned into the unexposed category."
)
##################################
# Pipeline sources and modifiers #
##################################
[docs]
def exposure_parameter_source(self, index: pd.Index) -> pd.Series:
"""Return exposure probabilities from the exposure lookup table.
Parameters
----------
index
An index representing the simulants.
Returns
-------
A series of exposure probabilities.
"""
return self.exposure_table(index)
[docs]
def exposure_ppf(self, index: pd.Index) -> pd.Series:
"""Assign each simulant to the exposed or unexposed category based on propensity.
Parameters
----------
index
An index representing the simulants.
Returns
-------
A series of exposed or unexposed category labels.
"""
pop = self.population_view.get(
index, [self.causal_factor_propensity, self.exposure_params_name]
)
quantiles = pop[self.causal_factor_propensity]
exposed = quantiles < pop[self.exposure_params_name]
return pd.Series(
exposed.replace({True: self.exposed, False: self.unexposed}),
name=self.causal_factor + ".exposure",
index=quantiles.index,
)
[docs]
def clip(q: pd.Series) -> pd.Series:
"""Clip quantile values to avoid distribution boundary issues.
The risk distributions package uses the 99.9th and 0.001st percentiles
of a log-normal distribution as the bounds of the distribution support.
This is bound up in the GBD risk factor PAF calculation process.
Clip the distribution tails so we don't get NaNs back from the
distribution calls.
Parameters
----------
q
A series of quantile values to clip.
Returns
-------
The clipped quantile values.
"""
Q_LOWER_BOUND = 0.0011
Q_UPPER_BOUND = 0.998
q[q > Q_UPPER_BOUND] = Q_UPPER_BOUND
q[q < Q_LOWER_BOUND] = Q_LOWER_BOUND
return q
[docs]
def get_risk_distribution_parameter(data: float | pd.DataFrame) -> float | pd.Series:
"""Convert risk distribution parameter data to a usable format.
If the data is a DataFrame, set the non-value columns as the index
and squeeze to a Series. Drop a ``"parameter"`` column if its only
value is ``"continuous"``.
Parameters
----------
data
The raw parameter data, either a scalar float or a DataFrame.
Returns
-------
The parameter as a float or a ``pd.Series`` indexed by
demographic columns.
"""
if isinstance(data, pd.DataFrame):
# don't return parameter col in continuous and ensemble distribution
# means to match standard deviation index
if "parameter" in data.columns and set(data["parameter"]) == {"continuous"}:
data = data.drop("parameter", axis=1)
index = [col for col in data.columns if col != DEFAULT_VALUE_COLUMN]
data = data.set_index(index).squeeze(axis=1)
return data