Source code for vivarium.framework.values

"""
=========================
The Value Pipeline System
=========================

The value pipeline system is a vital part of the :mod:`vivarium`
infrastructure. It allows for values that determine the behavior of individual
:term:`simulants <Simulant>` to be constructed across across multiple
:ref:`components <components_concept>`.

For more information about when and how you should use pipelines in your
simulations, see the value system :ref:`concept note <values_concept>`.

"""
from collections import defaultdict
from numbers import Number
from typing import Any, Callable, Iterable, List, Tuple, Union

import numpy as np
import pandas as pd

from vivarium.exceptions import VivariumError
from vivarium.framework.utilities import from_yearly
from vivarium.manager import Manager

# Supports standard algebraic operations with scalar values.
NumberLike = Union[np.ndarray, pd.Series, pd.DataFrame, Number]


[docs] class DynamicValueError(VivariumError): """Indicates an improperly configured value was invoked.""" pass
[docs] def replace_combiner(value: Any, mutator: Callable, *args: Any, **kwargs: Any) -> Any: """Replace the previous pipeline output with the output of the mutator. This is the default combiner. Parameters ---------- value The value from the previous step in the pipeline. mutator A callable that takes in all arguments that the pipeline source takes in plus an additional last positional argument for the value from the previous stage in the pipeline. args, kwargs The same args and kwargs provided during the invocation of the pipeline. Returns ------- Any A modified version of the input value. """ args = list(args) + [value] return mutator(*args, **kwargs)
[docs] def list_combiner(value: List, mutator: Callable, *args: Any, **kwargs: Any) -> List: """Aggregates source and mutator output into a list. This combiner is meant to be used with a post-processor that does some kind of reduce operation like summing all values in the list. Parameters ---------- value A list of all values provided by the source and prior mutators in the pipeline. mutator A callable that returns some portion of this pipeline's final value. args, kwargs The same args and kwargs provided during the invocation of the pipeline. Returns ------- The input list with new mutator portion of the pipeline value appended to it. """ value.append(mutator(*args, **kwargs)) return value
[docs] def rescale_post_processor(value: NumberLike, manager: "ValuesManager") -> NumberLike: """Rescales annual rates to time-step appropriate rates. This should only be used with a simulation using a :class:`~vivarium.framework.time.DateTimeClock` or another implementation of a clock that traffics in pandas date-time objects. Parameters ---------- value Annual rates, either as a number or something we can broadcast multiplication over like a :mod:`numpy` array or :mod:`pandas` data frame. time_step A pandas time delta representing the size of the upcoming time step. Returns ------- Union[numpy.ndarray, pandas.Series, pandas.DataFrame, numbers.Number] The annual rates rescaled to the size of the current time step size. """ if hasattr(value, "index"): return value.mul( manager.simulant_step_sizes(value.index) .astype("timedelta64[ns]") .dt.total_seconds() / (60 * 60 * 24 * 365.0), axis=0, ) else: return from_yearly(value, manager.step_size())
[docs] def union_post_processor(values: List[NumberLike], _) -> NumberLike: """Computes a probability on the union of the sample spaces in the values. Given a list of values where each value is a probability of an independent event, this post processor computes the probability of the union of the events. .. list-table:: :width: 100% :widths: 1 3 * - :math:`p_x` - Probability of event x * - :math:`1 - p_x` - Probability of not event x * - :math:`\prod_x(1 - p_x)` - Probability of not any events x * - :math:`1 - \prod_x(1 - p_x)` - Probability of any event x Parameters ---------- values A list of independent proportions or probabilities, either as numbers or as a something we can broadcast addition and multiplication over. Returns ------- Union[numpy.ndarray, pandas.Series, pandas.DataFrame, numbers.Number] The probability over the union of the sample spaces represented by the original probabilities. """ # if there is only one value, return the value if len(values) == 1: return values[0] # if there are multiple values, calculate the joint value product = 1 for v in values: new_value = 1 - v product = product * new_value joint_value = 1 - product return joint_value
[docs] class Pipeline: """A tool for building up values across several components. Pipelines are lazily initialized so that we don't have to put constraints on the order in which components are created and set up. The values manager will configure a pipeline (set all of its attributes) when the pipeline source is created. As long as a pipeline is not actually called in a simulation, it does not need a source or to be configured. This might occur when writing generic components that create a set of pipeline modifiers for values that won't be used in the particular simulation. Attributes ---------- name The name of the value represented by this pipeline. source A callable source for this pipeline's value. mutators A list of callables that directly modify the pipeline source or contribute portions of the value. combiner A strategy for combining the source and mutator values into the final value represented by the pipeline. post_processor An optional final transformation to perform on the combined output of the source and mutators. manager A reference to the simulation values manager. """ def __init__(self): self.name = None self.source = None self.mutators = [] self.combiner = None self.post_processor = None self.manager = None def __call__(self, *args, skip_post_processor=False, **kwargs): """Generates the value represented by this pipeline. Arguments --------- skip_post_processor Whether we should invoke the post-processor on the combined source and mutator output or return without post-processing. This is useful when the post-processor acts as some sort of final unit conversion (e.g. the rescale post processor). args, kwargs Pipeline arguments. These should be the arguments to the callable source of the pipeline. Returns ------- The value represented by the pipeline. Raises ------ DynamicValueError If the pipeline is invoked without a source set. """ return self._call(*args, skip_post_processor=skip_post_processor, **kwargs) def _call(self, *args, skip_post_processor=False, **kwargs): if not self.source: raise DynamicValueError( f"The dynamic value pipeline for {self.name} has no source. This likely means " f"you are attempting to modify a value that hasn't been created." ) value = self.source(*args, **kwargs) for mutator in self.mutators: value = self.combiner(value, mutator, *args, **kwargs) if self.post_processor and not skip_post_processor: return self.post_processor(value, self.manager) if isinstance(value, pd.Series): value.name = self.name return value def __repr__(self): return f"_Pipeline({self.name})"
[docs] class ValuesManager(Manager): """Manager for the dynamic value system.""" def __init__(self): # Pipelines are lazily initialized by _register_value_producer self._pipelines = defaultdict(Pipeline) @property def name(self): return "values_manager"
[docs] def setup(self, builder): self.logger = builder.logging.get_logger(self.name) self.step_size = builder.time.step_size() self.simulant_step_sizes = builder.time.simulant_step_sizes() builder.event.register_listener("post_setup", self.on_post_setup) self.resources = builder.resources self.add_constraint = builder.lifecycle.add_constraint builder.lifecycle.add_constraint(self.register_value_producer, allow_during=["setup"]) builder.lifecycle.add_constraint(self.register_value_modifier, allow_during=["setup"])
[docs] def on_post_setup(self, _): """Finalizes dependency structure for the pipelines.""" # Unsourced pipelines might occur when generic components register # modifiers to values that aren't required in a simulation. unsourced_pipelines = [p for p, v in self._pipelines.items() if not v.source] if unsourced_pipelines: self.logger.warning(f"Unsourced pipelines: {unsourced_pipelines}") # register_value_producer and register_value_modifier record the # dependency structure for the pipeline source and pipeline modifiers, # respectively. We don't have enough information to record the # dependency structure for the pipeline itself until now, where # we say the pipeline value depends on its source and all its # modifiers. for name, pipe in self._pipelines.items(): dependencies = [] if pipe.source: dependencies += [f"value_source.{name}"] else: dependencies += [f"missing_value_source.{name}"] for i, m in enumerate(pipe.mutators): mutator_name = self._get_modifier_name(m) dependencies.append(f"value_modifier.{name}.{i+1}.{mutator_name}") self.resources.add_resources("value", [name], pipe._call, dependencies)
[docs] def register_value_producer( self, value_name: str, source: Callable, requires_columns: List[str] = (), requires_values: List[str] = (), requires_streams: List[str] = (), preferred_combiner: Callable = replace_combiner, preferred_post_processor: Callable = None, ) -> Pipeline: """Marks a ``Callable`` as the producer of a named value. See Also -------- :meth:`ValuesInterface.register_value_producer` """ pipeline = self._register_value_producer( value_name, source, preferred_combiner, preferred_post_processor ) # The resource we add here is just the pipeline source. # The value will depend on the source and its modifiers, and we'll # declare that resource at post-setup once all sources and modifiers # are registered. dependencies = self._convert_dependencies( source, requires_columns, requires_values, requires_streams ) self.resources.add_resources("value_source", [value_name], source, dependencies) self.add_constraint( pipeline._call, restrict_during=["initialization", "setup", "post_setup"] ) return pipeline
def _register_value_producer( self, value_name: str, source: Callable, preferred_combiner: Callable, preferred_post_processor: Callable, ): """Configure the named value pipeline with a source, combiner, and post-processor.""" self.logger.debug(f"Registering value pipeline {value_name}") pipeline = self._pipelines[value_name] if pipeline.source: raise DynamicValueError( f"A second component is attempting to set the source for pipeline {value_name} " f"with {source}, but it already has a source: {pipeline.source}." ) pipeline.name = value_name pipeline.source = source pipeline.combiner = preferred_combiner pipeline.post_processor = preferred_post_processor pipeline.manager = self return pipeline
[docs] def register_value_modifier( self, value_name: str, modifier: Callable, requires_columns: List[str] = (), requires_values: List[str] = (), requires_streams: List[str] = (), ): """Marks a ``Callable`` as the modifier of a named value. Parameters ---------- value_name : The name of the dynamic value pipeline to be modified. modifier : A function that modifies the source of the dynamic value pipeline when called. If the pipeline has a ``replace_combiner``, the modifier should accept the same arguments as the pipeline source with an additional last positional argument for the results of the previous stage in the pipeline. For the ``list_combiner`` strategy, the pipeline modifiers should have the same signature as the pipeline source. requires_columns A list of the state table columns that already need to be present and populated in the state table before the pipeline modifier is called. requires_values A list of the value pipelines that need to be properly sourced before the pipeline modifier is called. requires_streams A list of the randomness streams that need to be properly sourced before the pipeline modifier is called. """ modifier_name = self._get_modifier_name(modifier) pipeline = self._pipelines[value_name] # May create a pipeline pipeline.mutators.append(modifier) name = f"{value_name}.{len(pipeline.mutators)}.{modifier_name}" self.logger.debug(f"Registering {name} as modifier to {value_name}") dependencies = self._convert_dependencies( modifier, requires_columns, requires_values, requires_streams ) self.resources.add_resources("value_modifier", [name], modifier, dependencies)
[docs] def get_value(self, name): """Retrieve the pipeline representing the named value. Parameters ---------- name Name of the pipeline to return. Returns ------- A callable reference to the named pipeline. The pipeline arguments should be identical to the arguments to the pipeline source (frequently just a :class:`pandas.Index` representing the simulants). """ return self._pipelines[name] # May create a pipeline.
@staticmethod def _convert_dependencies(func, requires_columns, requires_values, requires_streams): # If declaring a pipeline as a value source or modifier, columns and # streams are optional since the pipeline itself will have all the # appropriate dependencies. In any situation, make sure we don't have # provide the pipeline function to source/modifier as well as # explicitly stating the pipeline name in 'requires_values'. if isinstance(func, Pipeline): dependencies = [f"value.{func.name}"] else: dependencies = ( [f"column.{name}" for name in requires_columns] + [f"value.{name}" for name in requires_values] + [f"stream.{name}" for name in requires_streams] ) return dependencies @staticmethod def _get_modifier_name(modifier): """Get reproducible modifier names based on the modifier type.""" if hasattr(modifier, "name"): # This is Pipeline or lookup table or something similar modifier_name = modifier.name elif hasattr( modifier, "__self__" ): # This is a bound method of a component or other object owner = modifier.__self__ owner_name = owner.name if hasattr(owner, "name") else owner.__class__.__name__ modifier_name = f"{owner_name}.{modifier.__name__}" elif hasattr(modifier, "__name__"): # Some unbound function modifier_name = modifier.__name__ elif hasattr(modifier, "__call__"): # Some anonymous callable modifier_name = f"{modifier.__class__.name__}.__call__" else: # I don't know what this is. raise ValueError(f"Unknown modifier type: {type(modifier)}") return modifier_name
[docs] def keys(self) -> Iterable[str]: """Get an iterable of pipeline names.""" return self._pipelines.keys()
[docs] def items(self) -> Iterable[Tuple[str, Pipeline]]: """Get an iterable of name, pipeline tuples.""" return self._pipelines.items()
[docs] def values(self) -> Iterable[Pipeline]: """Get an iterable of all pipelines.""" return self._pipelines.values()
def __contains__(self, item: str) -> bool: return item in self._pipelines def __iter__(self) -> Iterable[str]: return iter(self._pipelines) def __repr__(self) -> str: return "ValuesManager()"
[docs] class ValuesInterface: """Public interface for the simulation values management system. The values system provides tools to build up a value across many components, allowing users to build components that focus on small groups of simulant attributes. """ def __init__(self, manager: ValuesManager): self._manager = manager
[docs] def register_value_producer( self, value_name: str, source: Callable, requires_columns: List[str] = (), requires_values: List[str] = (), requires_streams: List[str] = (), preferred_combiner: Callable = replace_combiner, preferred_post_processor: Callable = None, ) -> Pipeline: """Marks a ``Callable`` as the producer of a named value. Parameters ---------- value_name The name of the new dynamic value pipeline. source A callable source for the dynamic value pipeline. requires_columns A list of the state table columns that already need to be present and populated in the state table before the pipeline source is called. requires_values A list of the value pipelines that need to be properly sourced before the pipeline source is called. requires_streams A list of the randomness streams that need to be properly sourced before the pipeline source is called. preferred_combiner A strategy for combining the source and the results of any calls to mutators in the pipeline. ``vivarium`` provides the strategies ``replace_combiner`` (the default) and ``list_combiner``, which are importable from ``vivarium.framework.values``. Client code may define additional strategies as necessary. preferred_post_processor A strategy for processing the final output of the pipeline. ``vivarium`` provides the strategies ``rescale_post_processor`` and ``union_post_processor`` which are importable from ``vivarium.framework.values``. Client code may define additional strategies as necessary. Returns ------- Pipeline A callable reference to the named dynamic value pipeline. """ return self._manager.register_value_producer( value_name, source, requires_columns, requires_values, requires_streams, preferred_combiner, preferred_post_processor, )
[docs] def register_rate_producer( self, rate_name: str, source: Callable, requires_columns: List[str] = (), requires_values: List[str] = (), requires_streams: List[str] = (), ) -> Pipeline: """Marks a ``Callable`` as the producer of a named rate. This is a convenience wrapper around ``register_value_producer`` that makes sure rate data is appropriately scaled to the size of the simulation time step. It is equivalent to ``register_value_producer(value_name, source, preferred_combiner=replace_combiner, preferred_post_processor=rescale_post_processor)`` Parameters ---------- rate_name The name of the new dynamic rate pipeline. source A callable source for the dynamic rate pipeline. requires_columns A list of the state table columns that already need to be present and populated in the state table before the pipeline source is called. requires_values A list of the value pipelines that need to be properly sourced before the pipeline source is called. requires_streams A list of the randomness streams that need to be properly sourced before the pipeline source is called. Returns ------- Pipeline A callable reference to the named dynamic rate pipeline. """ return self.register_value_producer( rate_name, source, requires_columns, requires_values, requires_streams, preferred_post_processor=rescale_post_processor, )
[docs] def register_value_modifier( self, value_name: str, modifier: Callable, requires_columns: List[str] = (), requires_values: List[str] = (), requires_streams: List[str] = (), ): """Marks a ``Callable`` as the modifier of a named value. Parameters ---------- value_name : The name of the dynamic value pipeline to be modified. modifier : A function that modifies the source of the dynamic value pipeline when called. If the pipeline has a ``replace_combiner``, the modifier should accept the same arguments as the pipeline source with an additional last positional argument for the results of the previous stage in the pipeline. For the ``list_combiner`` strategy, the pipeline modifiers should have the same signature as the pipeline source. requires_columns A list of the state table columns that already need to be present and populated in the state table before the pipeline modifier is called. requires_values A list of the value pipelines that need to be properly sourced before the pipeline modifier is called. requires_streams A list of the randomness streams that need to be properly sourced before the pipeline modifier is called. """ self._manager.register_value_modifier( value_name, modifier, requires_columns, requires_values, requires_streams )
[docs] def get_value(self, name: str) -> Pipeline: """Retrieve the pipeline representing the named value. Parameters ---------- name Name of the pipeline to return. Returns ------- A callable reference to the named pipeline. The pipeline arguments should be identical to the arguments to the pipeline source (frequently just a :class:`pandas.Index` representing the simulants). """ return self._manager.get_value(name)