Source code for vivarium.framework.values.manager

"""
==============
Values Manager
==============

"""

from __future__ import annotations

from collections.abc import Callable, Iterable, Sequence
from typing import TYPE_CHECKING, Any, TypeVar, overload

from vivarium.framework.event import Event
from vivarium.framework.lifecycle import lifecycle_states
from vivarium.framework.resource import Column, Resource
from vivarium.framework.values.combiners import ValueCombiner, replace_combiner
from vivarium.framework.values.pipeline import (
    AttributePipeline,
    AttributesValueSource,
    DynamicValueError,
    Pipeline,
    PrivateColumnValueSource,
    ValueSource,
)
from vivarium.framework.values.post_processors import AttributePostProcessor, PostProcessor
from vivarium.manager import Manager

if TYPE_CHECKING:
    import pandas as pd

    from vivarium.framework.engine import Builder

T = TypeVar("T")


[docs] class ValuesManager(Manager): """Manager for the dynamic value system. Notes ----- This is the only manager for the values system; different methods exist for working with generic value :class:`Pipelines <vivarium.framework.values.pipeline.Pipeline>` and :class:`AttributePipelines <vivarium.framework.values.pipeline.AttributePipeline>`. """ def __init__(self) -> None: # Pipelines are lazily initialized by _register_value_producer self._value_pipelines: dict[str, Pipeline] = {} self._attribute_pipelines: dict[str, AttributePipeline] = {} @property def name(self) -> str: return "values_manager" @property def _all_pipelines(self) -> dict[str, Pipeline]: return {**self._value_pipelines, **self._attribute_pipelines}
[docs] def setup(self, builder: Builder) -> None: self.logger = builder.logging.get_logger(self.name) self.step_size = builder.time.step_size() self.simulant_step_sizes = builder.time.simulant_step_sizes() builder.event.register_listener("post_setup", self.on_post_setup) self._get_view = builder.population.get_view self._add_resource = builder.resources.add_resource self._get_current_component = builder.components.get_current_component_or_manager self._add_constraint = builder.lifecycle.add_constraint builder.lifecycle.add_constraint( self.register_value_producer, allow_during=[lifecycle_states.SETUP] ) builder.lifecycle.add_constraint( self.register_value_modifier, allow_during=[lifecycle_states.SETUP] ) builder.lifecycle.add_constraint( self.register_attribute_producer, allow_during=[lifecycle_states.SETUP] ) builder.lifecycle.add_constraint( self.register_attribute_modifier, allow_during=[lifecycle_states.SETUP] ) builder.lifecycle.add_constraint( self.get_attribute_pipelines, restrict_during=[lifecycle_states.SETUP] )
[docs] def on_post_setup(self, _event: Event) -> None: """Finalizes dependency structure for the pipelines.""" for pipeline in self._all_pipelines.values(): # Unsourced pipelines might occur when generic components register # modifiers to values that aren't required in a simulation. if not pipeline.source: self.logger.warning( f"Pipeline {pipeline.name} has no source. It will not be usable." )
[docs] def register_value_producer( self, value_name: str, source: Callable[..., Any], required_resources: Iterable[str | Resource] = (), preferred_combiner: ValueCombiner = replace_combiner, preferred_post_processor: PostProcessor | Sequence[PostProcessor] = (), ) -> Pipeline: """Registers a ``Pipeline`` as the producer of a named value. Parameters ---------- value_name The name of the new dynamic value pipeline. source A callable source for the dynamic value pipeline. required_resources A list of resources that the producer requires. A string represents a population attribute. preferred_combiner A strategy for combining the source and the results of any calls to mutators in the pipeline. ``vivarium`` provides the strategies ``replace_combiner`` (the default) and ``list_combiner``, which are importable from ``vivarium.framework.values``. Client code may define additional strategies as necessary. preferred_post_processor A strategy for processing the final output of the pipeline. ``vivarium`` provides the strategies ``rescale_post_processor`` and ``union_post_processor`` which are importable from ``vivarium.framework.values``. Client code may define additional strategies as necessary. If a sequence of post processors is provided, they will be applied in the order they are provided. Returns ------- The ``Pipeline`` that is registered as the producer of the named value. """ self.logger.debug(f"Registering value pipeline {value_name}") pipeline = self.get_value(value_name) self._configure_pipeline( pipeline, source, required_resources, preferred_combiner, preferred_post_processor, ) return pipeline
[docs] def register_attribute_producer( self, value_name: str, source: Callable[[pd.Index[int]], Any] | list[str], required_resources: Iterable[str | Resource] = (), preferred_combiner: ValueCombiner = replace_combiner, preferred_post_processor: AttributePostProcessor | Sequence[AttributePostProcessor] = (), source_is_private_column: bool = False, ) -> None: """Registers an ``AttributePipeline`` as the producer of a named attribute. Parameters ---------- value_name The name of the new dynamic attribute pipeline. source The source for the dynamic attribute pipeline. This can be a callable, a list containing a single name of a private column created by this component, or a list of population attributes. If a private column name is passed, `source_is_private_column` must also be set to True. required_resources A list of resources that the producer requires. A string represents a population attribute. preferred_combiner A strategy for combining the source and the results of any calls to mutators in the pipeline. ``vivarium`` provides the strategies ``replace_combiner`` (the default) and ``list_combiner``, which are importable from ``vivarium.framework.values``. Client code may define additional strategies as necessary. preferred_post_processor A strategy for processing the final output of the pipeline. ``vivarium`` provides the strategies ``rescale_post_processor`` and ``union_post_processor`` which are importable from ``vivarium.framework.values``. Client code may define additional strategies as necessary. If a sequence of post processors is provided, they will be applied in the order they are provided. source_is_private_column Whether or not the source is the name of a private column created by this component. """ self.logger.debug(f"Registering attribute pipeline {value_name}") pipeline = self.get_attribute(value_name) self._configure_pipeline( pipeline, source, required_resources=required_resources, preferred_combiner=preferred_combiner, preferred_post_processor=preferred_post_processor, source_is_private_column=source_is_private_column, )
[docs] def register_value_modifier( self, value_name: str, modifier: Callable[..., Any], required_resources: Iterable[str | Resource] = (), ) -> None: """Marks a ``Callable`` as the modifier of a named value. Parameters ---------- value_name The name of the dynamic value ``Pipeline`` to be modified. modifier A function that modifies the source of the dynamic value ``Pipeline`` when called. If the pipeline has a ``replace_combiner``, the modifier must have the same arguments as the pipeline source with an additional last positional argument for the results of the previous stage in the pipeline. For the ``list_combiner`` strategy, the pipeline modifiers should have the same signature as the pipeline source. required_resources A list of resources that the producer requires. A string represents a population attribute. """ self._configure_modifier( self.get_value(value_name), modifier, required_resources, )
[docs] def register_attribute_modifier( self, value_name: str, modifier: Callable[..., Any] | str, required_resources: Iterable[str | Resource] = (), ) -> None: """Marks a ``Callable`` as the modifier of a named attribute. Parameters ---------- value_name The name of the dynamic ``AttributePipeline`` to be modified. modifier A function that modifies the source of the dynamic ``AttributePipeline`` when called. If a string is passed, it refers to the name of an ``AttributePipeline``. If the pipeline has a ``replace_combiner``, the modifier should accept the same arguments as the pipeline source with an additional last positional argument for the results of the previous stage in the pipeline. For the ``list_combiner`` strategy, the pipeline modifiers should have the same signature as the pipeline source. required_resources A list of resources that need to be properly sourced before the pipeline modifier is called. This is a list of attribute names, pipelines, or randomness streams. """ modifier = self.get_attribute(modifier) if isinstance(modifier, str) else modifier self._configure_modifier( self.get_attribute(value_name), modifier, required_resources=required_resources, )
[docs] def get_value(self, name: str) -> Pipeline: """Retrieves the ``Pipeline`` representing the named value. Parameters ---------- name Name of the ``Pipeline`` to return. Returns ------- The requested ``Pipeline``. Notes ----- This will create a new ``Pipeline`` if one does not already exist. """ if name in self._attribute_pipelines: raise DynamicValueError( f"'{name}' is already registered as an attribute pipeline." ) pipeline = self._value_pipelines.get(name, Pipeline(name)) self._value_pipelines[name] = pipeline return pipeline
[docs] def get_value_pipelines(self) -> dict[str, Pipeline]: """Retrieves a dictionary of all registered value ``Pipelines``. To get all ``AttributePipelines``, use :meth:`get_attribute_pipelines`. Returns ------- A dictionary mapping value names to their corresponding ``Pipelines``. """ return self._value_pipelines
[docs] def get_attribute(self, name: str) -> AttributePipeline: """Retrieves the ``AttributePipeline`` representing the named attribute. To get a value ``Pipeline``, use :meth:`get_value`. Parameters ---------- name Name of the ``AttributePipeline`` to return. Returns ------- The requested ``AttributePipeline``. Notes ----- This will create a new ``AttributePipeline`` if one does not already exist. """ if name in self._value_pipelines: raise DynamicValueError(f"'{name}' is already registered as a value pipeline.") pipeline = self._attribute_pipelines.get(name, AttributePipeline(name)) self._attribute_pipelines[name] = pipeline return pipeline
[docs] def get_attribute_pipelines(self) -> dict[str, AttributePipeline]: """Returns a dictionary of ``AttributePipelines``. Returns ------- A dictionary mapping all registered attribute names to their corresponding ``AttributePipelines``. Notes ----- This is not the preferred access method to getting population attributes since it does not implement various features (e.g. querying, simulant tracking, etc); it exists for other managers to use if needed. Use :meth:`vivarium.framework.population.population_view.PopulationView.get` or :meth:`vivarium.framework.population.population_view.PopulationView.get_frame` instead. """ return self._attribute_pipelines
################## # Helper methods # ################## @overload def _configure_pipeline( self, pipeline: Pipeline, source: Callable[..., Any] | list[str], required_resources: Iterable[str | Resource] = (), preferred_combiner: ValueCombiner = replace_combiner, preferred_post_processor: PostProcessor | Sequence[PostProcessor] = (), source_is_private_column: bool = False, ) -> None: ... @overload def _configure_pipeline( self, pipeline: AttributePipeline, source: Callable[..., Any] | list[str], required_resources: Iterable[str | Resource] = (), preferred_combiner: ValueCombiner = replace_combiner, preferred_post_processor: AttributePostProcessor | Sequence[AttributePostProcessor] = (), source_is_private_column: bool = False, ) -> None: ... def _configure_pipeline( self, pipeline: Pipeline | AttributePipeline, source: Callable[..., Any] | list[str], required_resources: Iterable[str | Resource] = (), preferred_combiner: ValueCombiner = replace_combiner, preferred_post_processor: PostProcessor | AttributePostProcessor | Sequence[PostProcessor] | Sequence[AttributePostProcessor] = (), source_is_private_column: bool = False, ) -> None: component = self._get_current_component() value_source: ValueSource if source_is_private_column: generic_error_msg = ( f"Invalid source for {pipeline.name}. `source` must be list containing a single" " private column name." ) if not isinstance(source, list): raise ValueError( generic_error_msg + f"Got `source` type {type(source)} instead." ) if len(source) != 1: raise ValueError(generic_error_msg + f"Got {len(source)} names instead.") value_source = PrivateColumnValueSource( pipeline, source[0], self._get_view(component) # type: ignore[arg-type] ) if required_resources: self.logger.warning( f"Conflicting information for {pipeline.name}. Ignoring 'required_resources' " "since the `source_is_private_column` flag is set to True and we can infer " "the required resources directly." ) required_resources = [Column(source[0], component)] elif isinstance(source, list): value_source = AttributesValueSource(pipeline, source, self._get_view(component)) # type: ignore[arg-type] if required_resources: self.logger.warning( f"Conflicting information for {pipeline.name}. Ignoring 'required_resources' " "since the `source` is a list of attributes and we can infer the required " "resources directly." ) required_resources = source else: value_source = ValueSource(pipeline, source) if not isinstance(preferred_post_processor, Sequence): preferred_post_processor_list = [preferred_post_processor] else: preferred_post_processor_list = list(preferred_post_processor) pipeline.set_attributes( component=component, source=value_source, combiner=preferred_combiner, post_processor=preferred_post_processor_list, # type: ignore[arg-type] required_resources=required_resources, manager=self, ) self._add_resource(pipeline) restricted_states = [lifecycle_states.INITIALIZATION, lifecycle_states.SETUP] if isinstance(pipeline, AttributePipeline): restricted_states.append(lifecycle_states.POST_SETUP) self._add_constraint(pipeline._call, restrict_during=restricted_states) def _configure_modifier( self, pipeline: Pipeline | AttributePipeline, modifier: Callable[..., Any], required_resources: Iterable[str | Resource] = (), ) -> None: component = self._get_current_component() if isinstance(modifier, Resource): if required_resources: self.logger.warning( f"Conflicting information for {pipeline.name}. Ignoring 'required_resources' " f"since the `modifier` is of type {type(modifier)} and we can infer " "the required resources directly." ) required_resources = [modifier] value_modifier = pipeline.get_value_modifier(modifier, component, required_resources) self.logger.debug(f"Registering {value_modifier.name} as modifier to {pipeline.name}") self._add_resource(value_modifier) def __contains__(self, item: str) -> bool: return item in self._all_pipelines def __iter__(self) -> Iterable[str]: return iter(self._all_pipelines) def __repr__(self) -> str: return "ValuesManager()"