# Source code for vivarium.framework.randomness.stream

```
"""
==================
Randomness Streams
==================
This module provides a wrapper around numpy's randomness system with the intent of coupling
it to vivarium's tools for Common Random Number genereration.
Attributes
----------
RESIDUAL_CHOICE : object
A probability placeholder to be used in an un-normalized array of weights
to absorb leftover weight so that the array sums to unity.
For example::
[0.2, 0.2, RESIDUAL_CHOICE] => [0.2, 0.2, 0.6]
Note
----
Currently this object is only used in the `choice` function of this
module.
"""
import hashlib
from typing import Any, Callable, List, Tuple, Union
import numpy as np
import pandas as pd
from scipy import stats
from vivarium.framework.randomness.exceptions import RandomnessError
from vivarium.framework.randomness.index_map import IndexMap
from vivarium.framework.utilities import rate_to_probability
RESIDUAL_CHOICE = object()
[docs]
def get_hash(key: str) -> int:
"""Gets a hash of the provided key.
Parameters
----------
key :
A string used to create a seed for the random number generator.
Returns
-------
int
A hash of the provided key.
"""
max_allowable_numpy_seed = 4294967295 # 2**32 - 1
return int(hashlib.sha1(key.encode("utf8")).hexdigest(), 16) % max_allowable_numpy_seed
[docs]
class RandomnessStream:
"""A stream for producing common random numbers.
`RandomnessStream` objects provide an interface to Vivarium's
common random number generation. They provide a number of methods
for doing common simulation tasks that require random numbers like
making decisions among a number of choices.
Attributes
----------
key
The name of the randomness stream.
clock
A way to get the current simulation time.
seed
An extra number used to seed the random number generation.
Notes
-----
Should not be constructed by client code.
Simulation components get `RandomnessStream` objects by requesting
them from the builder provided to them during the setup phase.
I.E.::
class VivariumComponent:
def setup(self, builder):
self.randomness_stream = builder.randomness.get_stream('stream_name')
"""
def __init__(
self,
key: str,
clock: Callable,
seed: Any,
index_map: IndexMap,
initializes_crn_attributes: bool = False,
):
self.key = key
self.clock = clock
self.seed = seed
self.index_map = index_map
self.initializes_crn_attributes = initializes_crn_attributes
@property
def name(self):
return f"randomness_stream_{self.key}"
def _key(self, additional_key: Any = None) -> str:
"""Construct a hashable key from this object's state.
Parameters
----------
additional_key
Any additional information used to seed random number generation.
Returns
-------
str
A key to seed random number generation.
"""
return "_".join([self.key, str(self.clock()), str(additional_key), str(self.seed)])
[docs]
def get_draw(self, index: pd.Index, additional_key: Any = None) -> pd.Series:
"""Get an indexed set of numbers uniformly drawn from the unit interval.
Parameters
----------
index
An index whose length is the number of random draws made
and which indexes the returned `pandas.Series`.
additional_key
Any additional information used to seed random number generation.
Returns
-------
pandas.Series
A series of random numbers indexed by the provided `pandas.Index`.
Note
----
This is the core of the CRN implementation, allowing for consistent use of random
numbers across simulations with multiple scenarios.
See Also
--------
https://en.wikipedia.org/wiki/Variance_reduction and
"Untangling Uncertainty with Common Random Numbers:
A Simulation Study; A.Flaxman, et. al., Summersim 2017"
"""
# Return a structured null value if an empty index is passed
if index.empty:
return pd.Series(index=index, dtype=float)
# Initialize a random_state with a seed based on the simulation clock, the
# decision_point this stream represents, and any additional user-supplied information.
# This is one pre-condition to reproducibility.
seed = get_hash(self._key(additional_key))
random_state = np.random.RandomState(seed=seed)
# Second, we need to sample a very large chunk of random numbers. The size of the
# index map is set at the simulation start and is at least 10x the size of the
# initial population. Which means this is a consistently sampled block of uniformly
# distributed random numbers, irrespective of the size of the simulation population,
# which is important if there are scenarios that result in different population sizes
# through time.
sample_size = len(self.index_map)
raw_draws = random_state.random_sample(sample_size)
if self.initializes_crn_attributes:
# If we're initializing CRN attributes (i.e. attributes used to identify a
# simulant across multiple simulations), we can't use the index map yet since
# these people aren't registered in the index map yet. Instead, we use the draws
# from our random sample in order. This couples the initialization of CRN
# attributes to the time step on which they are initialized, meaning interventions
# that alter the entrance time of a simulant will break the CRN guarantees.
draws = pd.Series(raw_draws[: len(index)], index=index)
else:
# If we're not initializing CRN attributes, we can use the index map to get the
# correct draws for each simulant. This allows us to use the same CRN attributes
# across multiple simulations, even if the population size changes.
draw_index = self.index_map[index]
draws = pd.Series(raw_draws[draw_index], index=index)
return draws
[docs]
def filter_for_rate(
self,
population: Union[pd.DataFrame, pd.Series, pd.Index],
rate: Union[float, List, Tuple, np.ndarray, pd.Series],
additional_key: Any = None,
) -> Union[pd.DataFrame, pd.Series, pd.Index]:
"""Decide an event outcome for each individual from rates.
Given a population or its index and an array of associated rates for
some event to happen, we create and return the subpopulation for whom
the event occurred.
Parameters
----------
population
A view on the simulants for which we are determining the
outcome of an event.
rate
A scalar float value or a 1d list of rates of the event under
consideration occurring which corresponds (i.e.
`len(population) == len(probability))` to the population view passed
in. The rates must be scaled to the simulation time-step size either
manually or as a post-processing step in a rate pipeline. If a
scalar is provided, it is applied to every row in the population.
additional_key
Any additional information used to create the seed.
Returns
-------
pandas.core.generic.PandasObject
The subpopulation of the simulants for whom the event occurred.
The return type will be the same as type(population)
"""
return self.filter_for_probability(
population, rate_to_probability(rate), additional_key
)
[docs]
def filter_for_probability(
self,
population: Union[pd.DataFrame, pd.Series, pd.Index],
probability: Union[float, List, Tuple, np.ndarray, pd.Series],
additional_key: Any = None,
) -> Union[pd.DataFrame, pd.Series, pd.Index]:
"""Decide an outcome for each individual from probabilities.
Given a population or its index and an array of associated probabilities
for some event to happen, we create and return the subpopulation for
whom the event occurred.
Parameters
----------
population
A view on the simulants for which we are determining the
outcome of an event.
probability
A scalar float value or a 1d list of probabilities of the event
under consideration occurring which corresponds (i.e.
`len(population) == len(probability)` to the population view
passed in. If a scalar is provided, it is applied to every row in
the population.
additional_key
Any additional information used to create the seed.
Returns
-------
pandas.core.generic.PandasObject
The subpopulation of the simulants for whom the event occurred.
The return type will be the same as type(population)
"""
if population.empty:
return population
index = population if isinstance(population, pd.Index) else population.index
draws = self.get_draw(index, additional_key)
mask = np.array(draws < probability)
return population[mask]
[docs]
def choice(
self,
index: pd.Index,
choices: Union[List, Tuple, np.ndarray, pd.Series],
p: Union[List, Tuple, np.ndarray, pd.Series] = None,
additional_key: Any = None,
) -> pd.Series:
"""Decides between a weighted or unweighted set of choices.
Given a set of choices with or without corresponding weights,
returns an indexed set of decisions from those choices. This is
simply a vectorized way to make decisions with some book-keeping.
Parameters
----------
index
An index whose length is the number of random draws made
and which indexes the returned `pandas.Series`.
choices
A set of options to choose from.
p
The relative weights of the choices. Can be either a 1-d array of
the same length as `choices` or a 2-d array with `len(index)` rows
and `len(choices)` columns. In the 1-d case, the same set of
weights are used to decide among the choices for every item in
the `index`. In the 2-d case, each row in `p` contains a separate
set of weights for every item in the `index`.
additional_key
Any additional information used to seed random number generation.
Returns
-------
pandas.Series
An indexed set of decisions from among the available `choices`.
Raises
------
RandomnessError
If any row in `p` contains `RESIDUAL_CHOICE` and the remaining
weights in the row are not normalized or any row of `p contains
more than one reference to `RESIDUAL_CHOICE`.
"""
draws = self.get_draw(index, additional_key)
return _choice(draws, choices, p)
[docs]
def sample_from_distribution(
self,
index: pd.Index,
distribution: stats.rv_continuous = None,
ppf: Callable[[pd.Series, ...], pd.Series] = None,
additional_key: Any = None,
**distribution_kwargs: Any,
) -> pd.Series:
"""
Given a distribution, returns an indexed set of samples from that
distribution.
Parameters
----------
index
An index whose length is the number of random draws made
and which indexes the returned `pandas.Series`.
distribution
A scipy.stats distribution object.
ppf
A function that takes a series of draws and returns a series of samples.
additional_key
Any additional information used to seed random number generation.
distribution_kwargs
Additional keyword arguments to pass to the distribution's ppf function.
"""
if ppf is None and distribution is None:
raise ValueError("Either distribution or ppf must be provided")
if ppf is not None and distribution is not None:
raise ValueError("Only one of distribution or ppf can be provided")
if distribution is not None:
ppf = distribution.ppf
draws = self.get_draw(index, additional_key)
samples = pd.Series(ppf(draws, **distribution_kwargs), index=index)
return samples
def __repr__(self) -> str:
return "RandomnessStream(key={!r}, clock={!r}, seed={!r})".format(
self.key, self.clock(), self.seed
)
def _choice(
draws: pd.Series,
choices: Union[List, Tuple, np.ndarray, pd.Series],
p: Union[List, Tuple, np.ndarray, pd.Series] = None,
) -> pd.Series:
"""Decides between a weighted or unweighted set of choices.
Given a set of choices with or without corresponding weights,
returns an indexed set of decisions from those choices. This is
simply a vectorized way to make decisions with some book-keeping.
Parameters
----------
draws
A uniformly distributed random number for every simulant to make
a choice for.
choices
A set of options to choose from. Choices must be the same for every
simulant.
p
The relative weights of the choices. Can be either a 1-d array of
the same length as `choices` or a 2-d array with `len(draws)` rows
and `len(choices)` columns. In the 1-d case, the same set of weights
are used to decide among the choices for every item in the `index`.
In the 2-d case, each row in `p` contains a separate set of weights
for every item in the `index`.
Returns
-------
pandas.Series
An indexed set of decisions from among the available `choices`.
Raises
------
RandomnessError
If any row in `p` contains `RESIDUAL_CHOICE` and the remaining
weights in the row are not normalized or any row of `p` contains
more than one reference to `RESIDUAL_CHOICE`.
"""
# Convert p to normalized probabilities broadcasted over index.
p = (
_set_residual_probability(_normalize_shape(p, draws.index))
if p is not None
else np.ones((len(draws.index), len(choices)))
)
p = p / p.sum(axis=1, keepdims=True)
p_bins = np.cumsum(p, axis=1)
# Use the random draw to make a choice for every row in index.
choice_index = (draws.values[np.newaxis].T > p_bins).sum(axis=1)
return pd.Series(np.array(choices)[choice_index], index=draws.index)
def _normalize_shape(
p: Union[List, Tuple, np.ndarray, pd.Series],
index: Union[pd.Index, pd.MultiIndex],
) -> np.ndarray:
p = np.array(p)
# We got a 1-d array => same weights for every index.
if len(p.shape) == 1:
# Turn 1-d array into 2-d array with same weights in every row.
p = np.array(np.broadcast_to(p, (len(index), p.shape[0])))
return p
def _set_residual_probability(p: np.ndarray) -> np.ndarray:
"""Turns any use of `RESIDUAL_CHOICE` into a residual probability.
Parameters
----------
p
Array where each row is a set of probability weights and potentially
a `RESIDUAL_CHOICE` placeholder.
Returns
-------
numpy.ndarray
Array where each row is a set of normalized probability weights.
"""
residual_mask = p == RESIDUAL_CHOICE
if residual_mask.any(): # I.E. if we have any placeholders.
if np.any(np.sum(residual_mask, axis=1) - 1):
msg = (
"More than one residual choice supplied for a single "
f"set of weights. Weights: {p}."
)
raise RandomnessError(msg)
p[residual_mask] = 0
residual_p = 1 - np.sum(p, axis=1) # Probabilities sum to 1.
if np.any(residual_p < 0): # We got un-normalized probability weights.
msg = (
"Residual choice supplied with weights that summed to more than 1. "
f"Weights: {p}."
)
raise RandomnessError(msg)
p[residual_mask] = residual_p
return p
```