from __future__ import annotations
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Protocol
import numpy as np
import pandas as pd
from vivarium.framework.utilities import from_yearly
from vivarium.framework.values.exceptions import DynamicValueError
from vivarium.types import NumberLike, NumericArray
if TYPE_CHECKING:
from vivarium.framework.values.manager import ValuesManager
[docs]
class PostProcessor(Protocol):
def __call__(self, value: Any, manager: ValuesManager) -> Any:
...
[docs]
class AttributePostProcessor(Protocol):
"""An attribute pipeline post-processor must return a pd.Series or pd.DataFrame."""
def __call__(
self, index: pd.Index[int], value: Any, manager: ValuesManager
) -> pd.Series[Any] | pd.DataFrame:
...
[docs]
def rescale_post_processor(
index: pd.Index[int], value: NumberLike, manager: ValuesManager
) -> pd.Series[float] | pd.DataFrame:
"""Rescales annual rates to time-step appropriate rates.
This should only be used with a simulation using a :class:`~vivarium.framework.time.manager.DateTimeClock`
or another implementation of a clock that traffics in pandas date-time objects.
Parameters
----------
index
The index of the population for which the attribute is being produced.
value
Annual rates.
manager
The ValuesManager for this simulation.
Returns
-------
The annual rates rescaled to the size of the current time step size.
"""
if isinstance(value, (pd.Series, pd.DataFrame)):
return value.mul(
manager.simulant_step_sizes(value.index)
.astype("timedelta64[ns]")
.dt.total_seconds()
/ (60 * 60 * 24 * 365.0),
axis=0,
)
time_step = manager.step_size()
if not isinstance(time_step, (pd.Timedelta, timedelta)):
raise DynamicValueError(
"The rescale post processor requires a time step size that is a "
"datetime timedelta or pandas Timedelta object."
)
if isinstance(value, (int, float)):
return pd.Series(from_yearly(value, time_step), index=index)
elif isinstance(value, np.ndarray):
if value.ndim == 1:
return pd.Series(from_yearly(value, time_step), index=index)
elif value.ndim == 2:
return pd.DataFrame(from_yearly(value, time_step), index=index)
else:
raise DynamicValueError(
f"Numpy arrays with {value.ndim} dimensions are not supported. "
"Only 1D and 2D arrays are allowed."
)
else:
raise NotImplementedError
[docs]
def raw_union_post_processor(value: list[NumberLike], manager: ValuesManager) -> NumberLike:
"""Computes a probability on the union of the sample spaces in the values.
Given a list of values where each value is a probability of an independent
event, this post processor computes the probability of the union of the events.
.. list-table::
:width: 100%
:widths: 1 3
* - :math:`p_x`
- Probability of event x
* - :math:`1 - p_x`
- Probability of not event x
* - :math:`\prod_x(1 - p_x)`
- Probability of not any events x
* - :math:`1 - \prod_x(1 - p_x)`
- Probability of any event x
Parameters
----------
values
A list of independent proportions or probabilities, either as numbers or
as a something we can broadcast addition and multiplication over.
Returns
-------
The probability over the union of the sample spaces represented
by the original probabilities.
"""
if not isinstance(value, list):
raise DynamicValueError("The union post processor requires a list of values.")
for v in value:
if not isinstance(v, (np.ndarray, pd.Series, pd.DataFrame, float, int)):
raise DynamicValueError(
"The union post processor only supports numeric types, "
f"pandas Series/DataFrames, and numpy ndarrays. "
f"You provided a value of type {type(v)}."
)
joint_value: NumericArray | pd.Series[float] | pd.DataFrame | float | int
if len(value) == 1:
# if there is only one value, return the value
joint_value = value[0]
else:
# if there are multiple values, calculate the joint value
product: NumberLike = 1
for v in value:
new_value = 1 - v
product = product * new_value
joint_value = 1 - product
return joint_value
[docs]
def union_post_processor(
index: pd.Index[int], value: list[NumberLike], manager: ValuesManager
) -> pd.Series[Any] | pd.DataFrame:
"""Computes a probability on the union of the sample spaces in the values.
Given a list of values where each value is a probability of an independent
event, this post processor computes the probability of the union of the events.
.. list-table::
:width: 100%
:widths: 1 3
* - :math:`p_x`
- Probability of event x
* - :math:`1 - p_x`
- Probability of not event x
* - :math:`\prod_x(1 - p_x)`
- Probability of not any events x
* - :math:`1 - \prod_x(1 - p_x)`
- Probability of any event x
Parameters
----------
values
A list of independent proportions or probabilities, either as numbers or
as a something we can broadcast addition and multiplication over.
Returns
-------
The probability over the union of the sample spaces represented
by the original probabilities.
"""
joint_value = raw_union_post_processor(value, manager)
if isinstance(joint_value, np.ndarray):
if joint_value.ndim == 1:
return pd.Series(joint_value, index=index)
elif joint_value.ndim == 2:
return pd.DataFrame(joint_value, index=index)
else:
raise DynamicValueError(
f"Numpy arrays with {joint_value.ndim} dimensions are not supported. "
"Only 1D and 2D arrays are allowed."
)
elif isinstance(joint_value, (float, int)):
return pd.Series(joint_value, index=index)
else:
return joint_value