"""
==================
Randomness Streams
==================
This module provides a wrapper around numpy's randomness system with the intent of coupling
it to vivarium's tools for Common Random Number genereration.
Attributes
----------
RESIDUAL_CHOICE : object
A probability placeholder to be used in an un-normalized array of weights
to absorb leftover weight so that the array sums to unity.
For example::
[0.2, 0.2, RESIDUAL_CHOICE] => [0.2, 0.2, 0.6]
Note
----
Currently this object is only used in the `choice` function of this
module.
"""
import hashlib
from typing import Any, Callable, List, Tuple, Union
import numpy as np
import pandas as pd
from vivarium.framework.randomness.exceptions import RandomnessError
from vivarium.framework.randomness.index_map import IndexMap
from vivarium.framework.utilities import rate_to_probability
RESIDUAL_CHOICE = object()
[docs]def get_hash(key: str) -> int:
"""Gets a hash of the provided key.
Parameters
----------
key :
A string used to create a seed for the random number generator.
Returns
-------
int
A hash of the provided key.
"""
max_allowable_numpy_seed = 4294967295 # 2**32 - 1
return int(hashlib.sha1(key.encode("utf8")).hexdigest(), 16) % max_allowable_numpy_seed
[docs]class RandomnessStream:
"""A stream for producing common random numbers.
`RandomnessStream` objects provide an interface to Vivarium's
common random number generation. They provide a number of methods
for doing common simulation tasks that require random numbers like
making decisions among a number of choices.
Attributes
----------
key
The name of the randomness stream.
clock
A way to get the current simulation time.
seed
An extra number used to seed the random number generation.
Notes
-----
Should not be constructed by client code.
Simulation components get `RandomnessStream` objects by requesting
them from the builder provided to them during the setup phase.
I.E.::
class VivariumComponent:
def setup(self, builder):
self.randomness_stream = builder.randomness.get_stream('stream_name')
"""
def __init__(
self,
key: str,
clock: Callable,
seed: Any,
index_map: IndexMap,
initializes_crn_attributes: bool = False,
):
self.key = key
self.clock = clock
self.seed = seed
self.index_map = index_map
self.initializes_crn_attributes = initializes_crn_attributes
@property
def name(self):
return f"randomness_stream_{self.key}"
def _key(self, additional_key: Any = None) -> str:
"""Construct a hashable key from this object's state.
Parameters
----------
additional_key
Any additional information used to seed random number generation.
Returns
-------
str
A key to seed random number generation.
"""
return "_".join([self.key, str(self.clock()), str(additional_key), str(self.seed)])
[docs] def get_draw(self, index: pd.Index, additional_key: Any = None) -> pd.Series:
"""Get an indexed set of numbers uniformly drawn from the unit interval.
Parameters
----------
index
An index whose length is the number of random draws made
and which indexes the returned `pandas.Series`.
additional_key
Any additional information used to seed random number generation.
Returns
-------
pandas.Series
A series of random numbers indexed by the provided `pandas.Index`.
Note
----
This is the core of the CRN implementation, allowing for consistent use of random
numbers across simulations with multiple scenarios.
See Also
--------
https://en.wikipedia.org/wiki/Variance_reduction and
"Untangling Uncertainty with Common Random Numbers:
A Simulation Study; A.Flaxman, et. al., Summersim 2017"
"""
# Return a structured null value if an empty index is passed
if index.empty:
return pd.Series(index=index, dtype=float)
# Initialize a random_state with a seed based on the simulation clock, the
# decision_point this stream represents, and any additional user-supplied information.
# This is one pre-condition to reproducibility.
seed = get_hash(self._key(additional_key))
random_state = np.random.RandomState(seed=seed)
# Second, we need to sample a very large chunk of random numbers. The size of the
# index map is set at the simulation start and is at least 10x the size of the
# initial population. Which means this is a consistently sampled block of uniformly
# distributed random numbers, irrespective of the size of the simulation population,
# which is important if there are scenarios that result in different population sizes
# through time.
sample_size = len(self.index_map)
raw_draws = random_state.random_sample(sample_size)
if self.initializes_crn_attributes:
# If we're initializing CRN attributes (i.e. attributes used to identify a
# simulant across multiple simulations), we can't use the index map yet since
# these people aren't registered in the index map yet. Instead, we use the draws
# from our random sample in order. This couples the initialization of CRN
# attributes to the time step on which they are initialized, meaning interventions
# that alter the entrance time of a simulant will break the CRN guarantees.
draws = pd.Series(raw_draws[: len(index)], index=index)
else:
# If we're not initializing CRN attributes, we can use the index map to get the
# correct draws for each simulant. This allows us to use the same CRN attributes
# across multiple simulations, even if the population size changes.
draw_index = self.index_map[index]
draws = pd.Series(raw_draws[draw_index], index=index)
return draws
[docs] def filter_for_rate(
self,
population: Union[pd.DataFrame, pd.Series, pd.Index],
rate: Union[float, List, Tuple, np.ndarray, pd.Series],
additional_key: Any = None,
) -> Union[pd.DataFrame, pd.Series, pd.Index]:
"""Decide an event outcome for each individual from rates.
Given a population or its index and an array of associated rates for
some event to happen, we create and return the subpopulation for whom
the event occurred.
Parameters
----------
population
A view on the simulants for which we are determining the
outcome of an event.
rate
A scalar float value or a 1d list of rates of the event under
consideration occurring which corresponds (i.e.
`len(population) == len(probability))` to the population view passed
in. The rates must be scaled to the simulation time-step size either
manually or as a post-processing step in a rate pipeline. If a
scalar is provided, it is applied to every row in the population.
additional_key
Any additional information used to create the seed.
Returns
-------
pandas.core.generic.PandasObject
The subpopulation of the simulants for whom the event occurred.
The return type will be the same as type(population)
"""
return self.filter_for_probability(
population, rate_to_probability(rate), additional_key
)
[docs] def filter_for_probability(
self,
population: Union[pd.DataFrame, pd.Series, pd.Index],
probability: Union[float, List, Tuple, np.ndarray, pd.Series],
additional_key: Any = None,
) -> Union[pd.DataFrame, pd.Series, pd.Index]:
"""Decide an outcome for each individual from probabilities.
Given a population or its index and an array of associated probabilities
for some event to happen, we create and return the subpopulation for
whom the event occurred.
Parameters
----------
population
A view on the simulants for which we are determining the
outcome of an event.
probability
A scalar float value or a 1d list of probabilities of the event
under consideration occurring which corresponds (i.e.
`len(population) == len(probability)` to the population view
passed in. If a scalar is provided, it is applied to every row in
the population.
additional_key
Any additional information used to create the seed.
Returns
-------
pandas.core.generic.PandasObject
The subpopulation of the simulants for whom the event occurred.
The return type will be the same as type(population)
"""
if population.empty:
return population
index = population if isinstance(population, pd.Index) else population.index
draws = self.get_draw(index, additional_key)
mask = np.array(draws < probability)
return population[mask]
[docs] def choice(
self,
index: pd.Index,
choices: Union[List, Tuple, np.ndarray, pd.Series],
p: Union[List, Tuple, np.ndarray, pd.Series] = None,
additional_key: Any = None,
) -> pd.Series:
"""Decides between a weighted or unweighted set of choices.
Given a set of choices with or without corresponding weights,
returns an indexed set of decisions from those choices. This is
simply a vectorized way to make decisions with some book-keeping.
Parameters
----------
index
An index whose length is the number of random draws made
and which indexes the returned `pandas.Series`.
choices
A set of options to choose from.
p
The relative weights of the choices. Can be either a 1-d array of
the same length as `choices` or a 2-d array with `len(index)` rows
and `len(choices)` columns. In the 1-d case, the same set of
weights are used to decide among the choices for every item in
the `index`. In the 2-d case, each row in `p` contains a separate
set of weights for every item in the `index`.
additional_key
Any additional information used to seed random number generation.
Returns
-------
pandas.Series
An indexed set of decisions from among the available `choices`.
Raises
------
RandomnessError
If any row in `p` contains `RESIDUAL_CHOICE` and the remaining
weights in the row are not normalized or any row of `p contains
more than one reference to `RESIDUAL_CHOICE`.
"""
draws = self.get_draw(index, additional_key)
return _choice(draws, choices, p)
def __repr__(self) -> str:
return "RandomnessStream(key={!r}, clock={!r}, seed={!r})".format(
self.key, self.clock(), self.seed
)
def _choice(
draws: pd.Series,
choices: Union[List, Tuple, np.ndarray, pd.Series],
p: Union[List, Tuple, np.ndarray, pd.Series] = None,
) -> pd.Series:
"""Decides between a weighted or unweighted set of choices.
Given a set of choices with or without corresponding weights,
returns an indexed set of decisions from those choices. This is
simply a vectorized way to make decisions with some book-keeping.
Parameters
----------
draws
A uniformly distributed random number for every simulant to make
a choice for.
choices
A set of options to choose from. Choices must be the same for every
simulant.
p
The relative weights of the choices. Can be either a 1-d array of
the same length as `choices` or a 2-d array with `len(draws)` rows
and `len(choices)` columns. In the 1-d case, the same set of weights
are used to decide among the choices for every item in the `index`.
In the 2-d case, each row in `p` contains a separate set of weights
for every item in the `index`.
Returns
-------
pandas.Series
An indexed set of decisions from among the available `choices`.
Raises
------
RandomnessError
If any row in `p` contains `RESIDUAL_CHOICE` and the remaining
weights in the row are not normalized or any row of `p` contains
more than one reference to `RESIDUAL_CHOICE`.
"""
# Convert p to normalized probabilities broadcasted over index.
p = (
_set_residual_probability(_normalize_shape(p, draws.index))
if p is not None
else np.ones((len(draws.index), len(choices)))
)
p = p / p.sum(axis=1, keepdims=True)
p_bins = np.cumsum(p, axis=1)
# Use the random draw to make a choice for every row in index.
choice_index = (draws.values[np.newaxis].T > p_bins).sum(axis=1)
return pd.Series(np.array(choices)[choice_index], index=draws.index)
def _normalize_shape(
p: Union[List, Tuple, np.ndarray, pd.Series],
index: Union[pd.Index, pd.MultiIndex],
) -> np.ndarray:
p = np.array(p)
# We got a 1-d array => same weights for every index.
if len(p.shape) == 1:
# Turn 1-d array into 2-d array with same weights in every row.
p = np.array(np.broadcast_to(p, (len(index), p.shape[0])))
return p
def _set_residual_probability(p: np.ndarray) -> np.ndarray:
"""Turns any use of `RESIDUAL_CHOICE` into a residual probability.
Parameters
----------
p
Array where each row is a set of probability weights and potentially
a `RESIDUAL_CHOICE` placeholder.
Returns
-------
numpy.ndarray
Array where each row is a set of normalized probability weights.
"""
residual_mask = p == RESIDUAL_CHOICE
if residual_mask.any(): # I.E. if we have any placeholders.
if np.any(np.sum(residual_mask, axis=1) - 1):
msg = (
"More than one residual choice supplied for a single "
f"set of weights. Weights: {p}."
)
raise RandomnessError(msg)
p[residual_mask] = 0
residual_p = 1 - np.sum(p, axis=1) # Probabilities sum to 1.
if np.any(residual_p < 0): # We got un-normalized probability weights.
msg = (
"Residual choice supplied with weights that summed to more than 1. "
f"Weights: {p}."
)
raise RandomnessError(msg)
p[residual_mask] = residual_p
return p