Source code for vivarium.framework.randomness.stream

"""
==================
Randomness Streams
==================

This module provides a wrapper around numpy's randomness system with the intent of coupling
it to vivarium's tools for Common Random Number genereration.


Attributes
----------
RESIDUAL_CHOICE : object
    A probability placeholder to be used in an un-normalized array of weights
    to absorb leftover weight so that the array sums to unity.
    For example::

        [0.2, 0.2, RESIDUAL_CHOICE] => [0.2, 0.2, 0.6]

    Note
    ----
    Currently this object is only used in the `choice` function of this
    module.
"""
import hashlib
from typing import Any, Callable, List, Tuple, Union

import numpy as np
import pandas as pd

from vivarium.framework.randomness.exceptions import RandomnessError
from vivarium.framework.randomness.index_map import IndexMap
from vivarium.framework.utilities import rate_to_probability

RESIDUAL_CHOICE = object()


[docs]def get_hash(key: str) -> int: """Gets a hash of the provided key. Parameters ---------- key : A string used to create a seed for the random number generator. Returns ------- int A hash of the provided key. """ max_allowable_numpy_seed = 4294967295 # 2**32 - 1 return int(hashlib.sha1(key.encode("utf8")).hexdigest(), 16) % max_allowable_numpy_seed
[docs]class RandomnessStream: """A stream for producing common random numbers. `RandomnessStream` objects provide an interface to Vivarium's common random number generation. They provide a number of methods for doing common simulation tasks that require random numbers like making decisions among a number of choices. Attributes ---------- key The name of the randomness stream. clock A way to get the current simulation time. seed An extra number used to seed the random number generation. Notes ----- Should not be constructed by client code. Simulation components get `RandomnessStream` objects by requesting them from the builder provided to them during the setup phase. I.E.:: class VivariumComponent: def setup(self, builder): self.randomness_stream = builder.randomness.get_stream('stream_name') """ def __init__( self, key: str, clock: Callable, seed: Any, index_map: IndexMap, initializes_crn_attributes: bool = False, ): self.key = key self.clock = clock self.seed = seed self.index_map = index_map self.initializes_crn_attributes = initializes_crn_attributes @property def name(self): return f"randomness_stream_{self.key}" def _key(self, additional_key: Any = None) -> str: """Construct a hashable key from this object's state. Parameters ---------- additional_key Any additional information used to seed random number generation. Returns ------- str A key to seed random number generation. """ return "_".join([self.key, str(self.clock()), str(additional_key), str(self.seed)])
[docs] def get_draw(self, index: pd.Index, additional_key: Any = None) -> pd.Series: """Get an indexed set of numbers uniformly drawn from the unit interval. Parameters ---------- index An index whose length is the number of random draws made and which indexes the returned `pandas.Series`. additional_key Any additional information used to seed random number generation. Returns ------- pandas.Series A series of random numbers indexed by the provided `pandas.Index`. Note ---- This is the core of the CRN implementation, allowing for consistent use of random numbers across simulations with multiple scenarios. See Also -------- https://en.wikipedia.org/wiki/Variance_reduction and "Untangling Uncertainty with Common Random Numbers: A Simulation Study; A.Flaxman, et. al., Summersim 2017" """ # Return a structured null value if an empty index is passed if index.empty: return pd.Series(index=index, dtype=float) # Initialize a random_state with a seed based on the simulation clock, the # decision_point this stream represents, and any additional user-supplied information. # This is one pre-condition to reproducibility. seed = get_hash(self._key(additional_key)) random_state = np.random.RandomState(seed=seed) # Second, we need to sample a very large chunk of random numbers. The size of the # index map is set at the simulation start and is at least 10x the size of the # initial population. Which means this is a consistently sampled block of uniformly # distributed random numbers, irrespective of the size of the simulation population, # which is important if there are scenarios that result in different population sizes # through time. sample_size = len(self.index_map) raw_draws = random_state.random_sample(sample_size) if self.initializes_crn_attributes: # If we're initializing CRN attributes (i.e. attributes used to identify a # simulant across multiple simulations), we can't use the index map yet since # these people aren't registered in the index map yet. Instead, we use the draws # from our random sample in order. This couples the initialization of CRN # attributes to the time step on which they are initialized, meaning interventions # that alter the entrance time of a simulant will break the CRN guarantees. draws = pd.Series(raw_draws[: len(index)], index=index) else: # If we're not initializing CRN attributes, we can use the index map to get the # correct draws for each simulant. This allows us to use the same CRN attributes # across multiple simulations, even if the population size changes. draw_index = self.index_map[index] draws = pd.Series(raw_draws[draw_index], index=index) return draws
[docs] def filter_for_rate( self, population: Union[pd.DataFrame, pd.Series, pd.Index], rate: Union[float, List, Tuple, np.ndarray, pd.Series], additional_key: Any = None, ) -> Union[pd.DataFrame, pd.Series, pd.Index]: """Decide an event outcome for each individual from rates. Given a population or its index and an array of associated rates for some event to happen, we create and return the subpopulation for whom the event occurred. Parameters ---------- population A view on the simulants for which we are determining the outcome of an event. rate A scalar float value or a 1d list of rates of the event under consideration occurring which corresponds (i.e. `len(population) == len(probability))` to the population view passed in. The rates must be scaled to the simulation time-step size either manually or as a post-processing step in a rate pipeline. If a scalar is provided, it is applied to every row in the population. additional_key Any additional information used to create the seed. Returns ------- pandas.core.generic.PandasObject The subpopulation of the simulants for whom the event occurred. The return type will be the same as type(population) """ return self.filter_for_probability( population, rate_to_probability(rate), additional_key )
[docs] def filter_for_probability( self, population: Union[pd.DataFrame, pd.Series, pd.Index], probability: Union[float, List, Tuple, np.ndarray, pd.Series], additional_key: Any = None, ) -> Union[pd.DataFrame, pd.Series, pd.Index]: """Decide an outcome for each individual from probabilities. Given a population or its index and an array of associated probabilities for some event to happen, we create and return the subpopulation for whom the event occurred. Parameters ---------- population A view on the simulants for which we are determining the outcome of an event. probability A scalar float value or a 1d list of probabilities of the event under consideration occurring which corresponds (i.e. `len(population) == len(probability)` to the population view passed in. If a scalar is provided, it is applied to every row in the population. additional_key Any additional information used to create the seed. Returns ------- pandas.core.generic.PandasObject The subpopulation of the simulants for whom the event occurred. The return type will be the same as type(population) """ if population.empty: return population index = population if isinstance(population, pd.Index) else population.index draws = self.get_draw(index, additional_key) mask = np.array(draws < probability) return population[mask]
[docs] def choice( self, index: pd.Index, choices: Union[List, Tuple, np.ndarray, pd.Series], p: Union[List, Tuple, np.ndarray, pd.Series] = None, additional_key: Any = None, ) -> pd.Series: """Decides between a weighted or unweighted set of choices. Given a set of choices with or without corresponding weights, returns an indexed set of decisions from those choices. This is simply a vectorized way to make decisions with some book-keeping. Parameters ---------- index An index whose length is the number of random draws made and which indexes the returned `pandas.Series`. choices A set of options to choose from. p The relative weights of the choices. Can be either a 1-d array of the same length as `choices` or a 2-d array with `len(index)` rows and `len(choices)` columns. In the 1-d case, the same set of weights are used to decide among the choices for every item in the `index`. In the 2-d case, each row in `p` contains a separate set of weights for every item in the `index`. additional_key Any additional information used to seed random number generation. Returns ------- pandas.Series An indexed set of decisions from among the available `choices`. Raises ------ RandomnessError If any row in `p` contains `RESIDUAL_CHOICE` and the remaining weights in the row are not normalized or any row of `p contains more than one reference to `RESIDUAL_CHOICE`. """ draws = self.get_draw(index, additional_key) return _choice(draws, choices, p)
def __repr__(self) -> str: return "RandomnessStream(key={!r}, clock={!r}, seed={!r})".format( self.key, self.clock(), self.seed )
def _choice( draws: pd.Series, choices: Union[List, Tuple, np.ndarray, pd.Series], p: Union[List, Tuple, np.ndarray, pd.Series] = None, ) -> pd.Series: """Decides between a weighted or unweighted set of choices. Given a set of choices with or without corresponding weights, returns an indexed set of decisions from those choices. This is simply a vectorized way to make decisions with some book-keeping. Parameters ---------- draws A uniformly distributed random number for every simulant to make a choice for. choices A set of options to choose from. Choices must be the same for every simulant. p The relative weights of the choices. Can be either a 1-d array of the same length as `choices` or a 2-d array with `len(draws)` rows and `len(choices)` columns. In the 1-d case, the same set of weights are used to decide among the choices for every item in the `index`. In the 2-d case, each row in `p` contains a separate set of weights for every item in the `index`. Returns ------- pandas.Series An indexed set of decisions from among the available `choices`. Raises ------ RandomnessError If any row in `p` contains `RESIDUAL_CHOICE` and the remaining weights in the row are not normalized or any row of `p` contains more than one reference to `RESIDUAL_CHOICE`. """ # Convert p to normalized probabilities broadcasted over index. p = ( _set_residual_probability(_normalize_shape(p, draws.index)) if p is not None else np.ones((len(draws.index), len(choices))) ) p = p / p.sum(axis=1, keepdims=True) p_bins = np.cumsum(p, axis=1) # Use the random draw to make a choice for every row in index. choice_index = (draws.values[np.newaxis].T > p_bins).sum(axis=1) return pd.Series(np.array(choices)[choice_index], index=draws.index) def _normalize_shape( p: Union[List, Tuple, np.ndarray, pd.Series], index: Union[pd.Index, pd.MultiIndex], ) -> np.ndarray: p = np.array(p) # We got a 1-d array => same weights for every index. if len(p.shape) == 1: # Turn 1-d array into 2-d array with same weights in every row. p = np.array(np.broadcast_to(p, (len(index), p.shape[0]))) return p def _set_residual_probability(p: np.ndarray) -> np.ndarray: """Turns any use of `RESIDUAL_CHOICE` into a residual probability. Parameters ---------- p Array where each row is a set of probability weights and potentially a `RESIDUAL_CHOICE` placeholder. Returns ------- numpy.ndarray Array where each row is a set of normalized probability weights. """ residual_mask = p == RESIDUAL_CHOICE if residual_mask.any(): # I.E. if we have any placeholders. if np.any(np.sum(residual_mask, axis=1) - 1): msg = ( "More than one residual choice supplied for a single " f"set of weights. Weights: {p}." ) raise RandomnessError(msg) p[residual_mask] = 0 residual_p = 1 - np.sum(p, axis=1) # Probabilities sum to 1. if np.any(residual_p < 0): # We got un-normalized probability weights. msg = ( "Residual choice supplied with weights that summed to more than 1. " f"Weights: {p}." ) raise RandomnessError(msg) p[residual_mask] = residual_p return p