Boids

To get started with agent-based modelling, we’ll recreate the classic Boids simulation of flocking behavior. This is a relatively simple example but it produces very pleasing visualizations.

Setup

We’re assuming you’ve read through the material in getting started and are working in your vivarium_examples package. If not, you should go there first.

Todo

package setup with __init__ and stuff

Building a population

Create a file called population.py with the following content:

File: ~/code/vivarium_examples/boids/population.py
 1# docs-start: imports
 2import pandas as pd
 3
 4from vivarium import Component
 5from vivarium.framework.engine import Builder
 6from vivarium.framework.population import SimulantData
 7# docs-end: imports
 8
 9
10class Population(Component):
11    ##############
12    # Properties #
13    ##############
14
15    # docs-start: configuration_defaults
16    CONFIGURATION_DEFAULTS = {
17        "population": {
18            "colors": ["red", "blue"],
19        }
20    }
21    # docs-end: configuration_defaults
22
23    #####################
24    # Lifecycle methods #
25    #####################
26
27    # docs-start: setup
28    def setup(self, builder: Builder) -> None:
29        self.colors = builder.configuration.population.colors
30        self.randomness = builder.randomness.get_stream(self.name)
31        builder.population.register_initializer(
32            initializer=self.initialize_population,
33            columns=["color", "entrance_time"],
34            required_resources=[self.randomness]
35        )
36    # docs-end: setup
37
38    ########################
39    # Event-driven methods #
40    ########################
41
42    def initialize_population(self, pop_data: SimulantData) -> None:
43        new_population = pd.DataFrame(
44            {
45                "color": self.randomness.choice(pop_data.index, self.colors),
46                "entrance_time": pop_data.creation_time,
47            },
48            index=pop_data.index,
49        )
50        self.population_view.initialize(new_population)

Here we’re defining a component that generates a population of boids. Those boids are then randomly chosen to be either red or blue.

Let’s examine what’s going on in detail, as you’ll see many of the same patterns repeated in later components.

Imports

import pandas as pd

from vivarium import Component
from vivarium.framework.engine import Builder
from vivarium.framework.population import SimulantData

NumPy is a library for doing high performance numerical computing in Python. pandas is a set of tools built on top of numpy that allow for fast database-style querying and aggregating of data. Vivarium uses pandas.DataFrame objects as it’s underlying representation of the population and for many other data storage and manipulation tasks. By convention, most people abbreviate these packages as np and pd respectively, and we’ll follow that convention here.

Population class

Vivarium components are expressed as Python classes. You can find many resources on classes and object-oriented programming with a simple Google search. We’ll assume some fluency with this style of programming, but you should be able to follow along with most bits even if you’re unfamiliar.

Configuration defaults

In most simulations, we want to have an easily tunable set up knobs to adjust various parameters. Vivarium accomplishes this by pulling those knobs out as configuration information. Components typically expose the values they use in the configuration_defaults class attribute.

CONFIGURATION_DEFAULTS = {
    "population": {
        "colors": ["red", "blue"],
    }
}

We’ll talk more about configuration information later. For now observe that we’re exposing a set of possible colors for our boids.

The setup method

Almost every component in Vivarium will have a setup method. The setup method gives the component access to an instance of the Builder which exposes a handful of tools to help build components. The simulation framework is responsible for calling the setup method on components and providing the builder to them. We’ll explore these tools that the builder provides in detail as we go.

def setup(self, builder: Builder) -> None:
    self.colors = builder.configuration.population.colors
    self.randomness = builder.randomness.get_stream(self.name)
    builder.population.register_initializer(
        initializer=self.initialize_population,
        columns=["color", "entrance_time"],
        required_resources=[self.randomness]
    )

Our setup method is pretty simple: we just save the configured colors for later use, get a randomness stream, and register some private columns. Regarding the colors, note that the component is accessing the subsection of the configuration that it cares about. The full simulation configuration is available from the builder as builder.configuration. You can treat the configuration object just like a nested python dictionary that’s been extended to support dot-style attribute access. Our access here mirrors what’s in the configuration_defaults at the top of the class definition.

Note that the setup method is registering a method called initialize_population as an initializer that will create two private columns (color and entrance_time) and requires the randomness stream to do so. This tells Vivarium what columns (or “attributes”) the component will add to the population table and how. See the next section for where we actually create these columns.

Note

The Population State Table

When we talk about columns in the context of Vivarium, we are typically talking about simulant attributes. All attributes for all simulants can be represented by a single pandas DataFrame where each simulant is a row and each attribute is a column. This dataframe is referred to as the population state table (or simply “state table”).

Initializers

Finally we look at the initialize_population method which was registered as the one and only initializer method. Any registered initializers will be automatically called by Vivarium when new simulants are being added to the simulation.

We see that, like the setup method, initializer methods (initialize_population in this case) take in a special argument that we don’t provide. This argument, pop_data, is an instance of SimulantData containing a handful of information useful when initializing simulants.

The only bits of information we need for now are the randomness stream we registered as a required resource, the pop_data.index which supplies the index of the simulants to be initialized, and the pop_data.creation_time which gives us a representation of the simulation time when the simulant was generated (typically an int or pandas.Timestamp).

Note

The Population Index

The population state table we described before has an index that uniquely identifies each simulant. This index is used in several places in the simulation to look up information, calculate simulant-specific values, and update information about the simulants’ state.

Using the population index, we generate a pandas.DataFrame and fill it with the initial values of ‘entrance_time’ and ‘color’ for each new simulant. However, this new dataframe is just a table hanging out in memory - Vivarium knows nothing about it and it cannot readily be used througout the simulation. To actually be able to use the data as attributes, we need to tell Vivarium’s population management system to update the state table with the values.

Putting it together

Vivarium supports both a command line interface and an interactive one. We’ll look at how to run simulations from the command line later. For now, we can set up our simulation with the following code:

from vivarium import InteractiveContext
from vivarium_examples.boids.population import Population

sim = InteractiveContext(
   components=[Population()],
   configuration={'population': {'population_size': 500}},
   logging_verbosity=0,
)

# Peek at the population table
print(sim.get_population(["entrance_time", "color"]).head())
  entrance_time color
0    2005-07-01   red
1    2005-07-01   red
2    2005-07-01   red
3    2005-07-01   red
4    2005-07-01  blue

Movement

Before we get to the flocking behavior of boids, we need them to move. We create a Movement component for this purpose. It tracks the position and velocity of each boid, and creates an acceleration pipeline that we will use later.

File: ~/code/vivarium_examples/boids/movement.py
 1from __future__ import annotations
 2import numpy as np
 3import pandas as pd
 4
 5from vivarium.framework.event import Event
 6from vivarium import Component
 7from vivarium.framework.engine import Builder
 8from vivarium.framework.population import SimulantData
 9
10
11class Movement(Component):
12    ##############
13    # Properties #
14    ##############
15    CONFIGURATION_DEFAULTS = {
16        "field": {
17            "width": 1000,
18            "height": 1000,
19        },
20        "movement": {
21            "max_speed": 2,
22        },
23    }
24
25    #####################
26    # Lifecycle methods #
27    #####################
28
29    def setup(self, builder: Builder) -> None:
30        self.config = builder.configuration
31
32        # docs-start: register_attribute_producer
33        builder.value.register_attribute_producer(
34            "acceleration", source=self.base_acceleration
35        )
36        # docs-end: register_attribute_producer
37        self.randomness = builder.randomness.get_stream(self.name)
38        builder.population.register_initializer(
39            initializer=self.initialize_movement,
40            columns=["x", "y", "vx", "vy"],
41            required_resources=[self.randomness]
42        )
43
44    ##################################
45    # Pipeline sources and modifiers #
46    ##################################
47
48    # docs-start: base_acceleration
49    def base_acceleration(self, index: pd.Index[int]) -> pd.DataFrame:
50        return pd.DataFrame(0.0, columns=["acc_x", "acc_y"], index=index)
51    # docs-end: base_acceleration
52
53    ########################
54    # Event-driven methods #
55    ########################
56
57    def initialize_movement(self, pop_data: SimulantData) -> None:
58        # Start randomly distributed, with random velocities
59        new_population = pd.DataFrame(
60            {
61                "x": self.config.field.width * self.randomness.get_draw(pop_data.index, "x"),
62                "y": self.config.field.height * self.randomness.get_draw(pop_data.index, "y"),
63                "vx": ((2 * self.randomness.get_draw(pop_data.index, "vx")) - 1) * self.config.movement.max_speed,
64                "vy": ((2 * self.randomness.get_draw(pop_data.index, "vy")) - 1) * self.config.movement.max_speed,
65            },
66            index=pop_data.index,
67        )
68        self.population_view.initialize(new_population)
69
70    # docs-start: on_time_step
71    def on_time_step(self, event: Event) -> None:
72        def _apply_physics(pop: pd.DataFrame) -> pd.DataFrame:
73            acceleration = self.population_view.get_frame(event.index, "acceleration")
74
75            # Accelerate and limit velocity
76            if not isinstance(acceleration, pd.DataFrame):
77                raise ValueError("Acceleration must be a pd.DataFrame")
78            pop[["vx", "vy"]] += acceleration.rename(columns=lambda c: c.replace("acc_", "v"))
79            speed = np.sqrt(np.square(pop.vx) + np.square(pop.vy))
80            velocity_scaling_factor = np.where(
81                speed > self.config.movement.max_speed,
82                self.config.movement.max_speed / speed,
83                1.0,
84            )
85            pop["vx"] *= velocity_scaling_factor
86            pop["vy"] *= velocity_scaling_factor
87
88            # Move according to velocity
89            pop["x"] += pop.vx
90            pop["y"] += pop.vy
91
92            # Loop around boundaries
93            pop["x"] = pop.x % self.config.field.width
94            pop["y"] = pop.y % self.config.field.height
95
96            return pop
97
98        self.population_view.update(["x", "y", "vx", "vy"], _apply_physics)
99    # docs-end: on_time_step

You’ll notice that some parts of this component look very similar to our population component. Indeed, we can split up the responsibilities of initializing simulants over many different components. In Vivarium we tend to think of components as being responsible for individual behaviors or attributes. This makes it very easy to build very complex models while only having to think about local pieces of it.

However, there are also a few new Vivarium features on display in this component. We’ll step through these in more detail.

Attribute pipelines

Each attribute pipeline creates a different column in the population state table that contains information about our simulants (boids, in this case). Importantly, these attribute pipelines are is not stateful – each time one is accessed in order to generate the state table, its values are re-initialized from scratch, instead of “remembering” what they were on the previous timestep. This makes it appropriate for modeling acceleration, because we only want a boid to accelerate due to forces acting on it now. You can find an overview of the values system here.

Note

Attribute pipelines are a special type of the more generic value pipeline. While attribute pipelines dynamically calculate attributes of simulants, value pipelines can be used to calculate anything. By far the most common type of value pipeline used in Vivarium simulations are attribute pipelines and so we will not discuss the more general concept further in this tutorial.

The Builder class exposes an additional property for working with attribute pipelines: vivarium.framework.engine.Builder.value(). We call the vivarium.framework.values.interface.ValuesInterface.register_attribute_producer() method to register a new attribute pipeline as the producer of some attribute.

    builder.value.register_attribute_producer(
        "acceleration", source=self.base_acceleration
    )

This call provides a source function for our pipeline which initializes the values. In this case, the default is zero acceleration:

def base_acceleration(self, index: pd.Index[int]) -> pd.DataFrame:
    return pd.DataFrame(0.0, columns=["acc_x", "acc_y"], index=index)

This may seem pointless since acceleration will always be zero. Pipelines have another feature we will see later: other components can modify their values. We’ll create components later in this tutorial that modify this pipeline to exert forces on our boids.

The on_time_step method

This is a lifecycle method, much like any registered initializer methods. However, this method will be called on each step forward in time, not only when new simulants are initialized.

One very common thing components do on each time step is read and update the population state table. In this case, we change boids’ velocity according to their acceleration, limit their velocity to a maximum, and update their position according to their velocity.

To get population attributes such as acceleration inside on_time_step, we leverage a PopulationView which provides a handful of methods designed to get when you need. In this case, we call get_frame() to get the acceleration attribute. We pass in the event.index which is the set of simulants affected by the event (in this case, all of them). Note that there is also available a get() method which is similar to get_frame but can request multiple attributes at once and does not necessarily return a dataframe.

Note

Population Views

A PopulationView is a read/write interface to the population state table. It provides a number of convenience methods for getting and setting attributes, private columns, and other bits of information about the population.

We call the population view’s update() method, passing the names of the private columns we want to change and a modifier function that receives the current values and returns the updated ones. A private column is one that acts as a source of an attribute.

Note

Private Columns vs. Attributes

Knowing whether you need a private column or an attribute depends on context, but when you need to update the state table as we’re doing here, it’s important to understand that what you are really updating are the appropriate private columns that act as the source of the attributes you care about. In this case, we want to update the source data for position (x and y) and velocity (vx and vy) attributes which are this component’s private columns (i.e. this component registered the initializer that created those columns). The modifier receives a DataFrame of those four columns, and we also retrieve the acceleration attributes inside it so that we can apply forces.

def on_time_step(self, event: Event) -> None:
    def _apply_physics(pop: pd.DataFrame) -> pd.DataFrame:
        acceleration = self.population_view.get_frame(event.index, "acceleration")

        # Accelerate and limit velocity
        if not isinstance(acceleration, pd.DataFrame):
            raise ValueError("Acceleration must be a pd.DataFrame")
        pop[["vx", "vy"]] += acceleration.rename(columns=lambda c: c.replace("acc_", "v"))
        speed = np.sqrt(np.square(pop.vx) + np.square(pop.vy))
        velocity_scaling_factor = np.where(
            speed > self.config.movement.max_speed,
            self.config.movement.max_speed / speed,
            1.0,
        )
        pop["vx"] *= velocity_scaling_factor
        pop["vy"] *= velocity_scaling_factor

        # Move according to velocity
        pop["x"] += pop.vx
        pop["y"] += pop.vy

        # Loop around boundaries
        pop["x"] = pop.x % self.config.field.width
        pop["y"] = pop.y % self.config.field.height

        return pop

    self.population_view.update(["x", "y", "vx", "vy"], _apply_physics)

Putting it together

Let’s run the simulation with our new component and look again at the state table.

from vivarium import InteractiveContext
from vivarium_examples.boids.population import Population
from vivarium_examples.boids.movement import Movement

sim = InteractiveContext(
   components=[Population(), Movement()],
   configuration={'population': {'population_size': 500}},
   logging_verbosity=0,
)

# Peek at the population table
print(sim.get_population(["color", "x", "y", "vx", "vy"]).head())
  color           x           y        vx        vy
0   red  274.256447  907.889319 -0.396940  0.270696
1   red  388.784077   82.116094  0.392572 -1.693871
2   red  661.272905  303.481468 -0.102927  1.194465
3   red  758.825839  806.284468  0.709814  0.932636
4  blue  574.989313  159.504556 -1.487996  1.428098

Our population now has initial position and velocity! Now, we can take a step forward with sim.step() and “see” our boids’ positions change, but their velocity stay the same.

sim.step()

# Peek at the population state table
print(sim.get_population(["color", "x", "y", "vx", "vy"]).head())
  color           x           y        vx        vy
0   red  273.859507  908.160016 -0.396940  0.270696
1   red  389.176649   80.422223  0.392572 -1.693871
2   red  661.169977  304.675933 -0.102927  1.194465
3   red  759.535654  807.217104  0.709814  0.932636
4  blue  573.546355  160.889428 -1.442958  1.384873

Visualizing our population

Now is also a good time to come up with a way to plot our boids. We’ll later use this to generate animations of our boids moving around. We’ll use matplotlib for this.

Making good visualizations is hard, and beyond the scope of this tutorial, but the matplotlib documentation has a large number of examples and tutorials that should be useful.

For our purposes, we really just want to be able to plot the positions of our boids and maybe some arrows to indicated their velocity.

File: ~/code/vivarium_examples/boids/visualization.py
def plot_boids(simulation: InteractiveContext, plot_velocity: bool=False) -> None:
    width = simulation.configuration.field.width
    height = simulation.configuration.field.height
    pop = simulation.get_population(["x", "y", "color", "vx", "vy"])

    plt.figure(figsize=(12, 12))
    plt.scatter(pop.x, pop.y, color=pop.color)
    if plot_velocity:
        plt.quiver(pop.x, pop.y, pop.vx, pop.vy, color=pop.color, width=0.002)
    plt.xlabel("x")
    plt.ylabel("y")
    plt.axis((0, width, 0, height))
    plt.show()

We can then visualize our flock with

from vivarium import InteractiveContext
from vivarium_examples.boids.population import Population
from vivarium_examples.boids.movement import Movement
from vivarium_examples.boids.visualization import plot_boids

sim = InteractiveContext(
   components=[Population(), Movement()],
   configuration={'population': {'population_size': 500}},
   logging_verbosity=0,
)

plot_boids(sim, plot_velocity=True)

(Source code, png, hires.png, pdf)

../_images/boids-1.png

Calculating neighbors

The steering behavior in the Boids model is dictated by interactions of each boid with its nearby neighbors. A naive implementation of this can be very expensive. Luckily, Python has a ton of great libraries that have solved most of the hard problems.

Here, we’ll pull in a KDTree from SciPy and use it to build a component that tells us about the neighbor relationships of each boid.

File: ~/code/vivarium_examples/boids/neighbors.py
 1from __future__ import annotations
 2import pandas as pd
 3from scipy import spatial
 4
 5from vivarium import Component
 6from vivarium.framework.engine import Builder
 7from vivarium.framework.event import Event
 8from vivarium.framework.population import SimulantData
 9
10
11class Neighbors(Component):
12    ##############
13    # Properties #
14    ##############
15    configuration_defaults = {"neighbors": {"radius": 60}}
16
17    #####################
18    # Lifecycle methods #
19    #####################
20
21    def setup(self, builder: Builder) -> None:
22        self.radius = builder.configuration.neighbors.radius
23
24        self.neighbors_calculated = False
25        self._neighbors = pd.Series()
26        builder.value.register_attribute_producer(
27            "neighbors", source=self.get_neighbors, required_resources=["x", "y"]
28        )
29        builder.population.register_initializer(
30            initializer=self.initialize_neighbors,
31            columns=None,
32            required_resources=[],
33        )
34
35    ########################
36    # Event-driven methods #
37    ########################
38
39    def initialize_neighbors(self, pop_data: SimulantData) -> None:
40        self._neighbors = pd.Series([[]] * len(pop_data.index), index=pop_data.index)
41
42    def on_time_step(self, event: Event) -> None:
43        self.neighbors_calculated = False
44
45    ##################################
46    # Pipeline sources and modifiers #
47    ##################################
48
49    def get_neighbors(self, index: pd.Index[int]) -> pd.Series[list[int]]:  # type: ignore[type-var]
50        if not self.neighbors_calculated:
51            self._calculate_neighbors()
52        return self._neighbors[index]
53
54    ##################
55    # Helper methods #
56    ##################
57
58    def _calculate_neighbors(self) -> None:
59        # Reset our list of neighbors
60        pop = self.population_view.get(self._neighbors.index, ["x", "y"])
61        self._neighbors = pd.Series([[] for _ in range(len(pop))], index=pop.index)
62
63        tree = spatial.KDTree(pop[["x", "y"]])
64
65        # Iterate over each pair of simulants that are close together.
66        for boid_1, boid_2 in tree.query_pairs(self.radius):
67            # .iloc is used because query_pairs uses 0,1,... indexing instead of pandas.index
68            self._neighbors.iloc[boid_1].append(self._neighbors.index[boid_2])
69            self._neighbors.iloc[boid_2].append(self._neighbors.index[boid_1])

This component creates an attribute pipeline called neighbors so that other components can access the neighbors of each boid (remember that every attribute pipeline corresponds to a column in the population state table).

Note that the only thing it does in on_time_step is self.neighbors_calculated = False. That’s because we only want to calculate the neighbors once per time step. When the pipeline is called, we can tell with self.neighbors_calculated whether we need to calculate them or use our cached value in self._neighbors.

Swarming behavior

Now we know which boids are each others’ neighbors but we’re not doing anything with that information. We need to teach the boids to swarm!

There are lots of potential swarming behaviors to play around with, all of which change the way that boids clump up and follow each other. But since that isn’t the focus of this tutorial, we’ll implement separation, cohesion, and alignment behavior identical to what’s in this D3 example (Internet Archive version), and we’ll gloss over most of the calculations.

We define a base class for all our forces, since they will have a lot in common. We won’t get into the details of this class, but at a high level it uses the neighbors attribute to find all the pairs of boids that are neighbors, applies some force to (some of) those pairs, and limits that force to a maximum magnitude.

File: ~/code/vivarium_examples/boids/forces.py
class Force(Component, ABC):
    ##############
    # Properties #
    ##############
    @property
    def configuration_defaults(self) -> dict[str, dict[str, float]]:
        return {
            self.__class__.__name__.lower(): {
                "max_force": 0.03,
            },
        }

    #####################
    # Lifecycle methods #
    #####################

    def setup(self, builder: Builder) -> None:
        self.config = builder.configuration[self.__class__.__name__.lower()]
        self.max_speed = builder.configuration.movement.max_speed

        # docs-start: register_acceleration_modifier
        builder.value.register_attribute_modifier(
            "acceleration",
            modifier=self.apply_force,
            required_resources=["x", "y", "vx", "vy", "neighbors"],
        )
        # docs-end: register_acceleration_modifier

    ##################################
    # Pipeline sources and modifiers #
    ##################################

    def apply_force(self, index: pd.Index[int], acceleration: pd.DataFrame) -> pd.DataFrame:
        neighbors = self.population_view.get(index, "neighbors")
        pop = self.population_view.get(index, ["x", "y", "vx", "vy"])
        if not (isinstance(neighbors, pd.Series) and isinstance(pop, pd.DataFrame)):
            raise ValueError("Neighbors must be a pd.Series of ints and population a pd.DataFrame")
        pairs = self._get_pairs(neighbors, pop)

        raw_force = self.calculate_force(pairs)
        force = self._normalize_and_limit_force(
            force=raw_force,
            velocity=pop[["vx", "vy"]],
            max_force=self.config.max_force,
            max_speed=self.max_speed,
        )

        acceleration.loc[force.index, "acc_x"] += force["x"]
        acceleration.loc[force.index, "acc_y"] += force["y"]
        return acceleration

    ##################
    # Helper methods #
    ##################

    @abstractmethod
    def calculate_force(self, neighbors: pd.DataFrame) -> pd.DataFrame:
        pass

    def _get_pairs(self, neighbors: pd.Series[int | float], pop: pd.DataFrame) -> pd.DataFrame:
        pairs = (
            pop.join(neighbors.rename("neighbors"))
            .reset_index()
            .explode("neighbors")
            .merge(
                pop.reset_index(),
                left_on="neighbors",
                right_index=True,
                suffixes=("_self", "_other"),
            )
        )
        pairs["distance_x"] = pairs.x_other - pairs.x_self
        pairs["distance_y"] = pairs.y_other - pairs.y_self
        pairs["distance"] = np.sqrt(pairs.distance_x**2 + pairs.distance_y**2)

        return pairs

    def _normalize_and_limit_force(
        self,
        force: pd.DataFrame,
        velocity: pd.DataFrame,
        max_force: float,
        max_speed: float,
    ) -> pd.DataFrame:
        normalization_factor = np.where(
            (force.x != 0) | (force.y != 0),
            max_speed / self._magnitude(force),
            1.0,
        )
        force["x"] *= normalization_factor
        force["y"] *= normalization_factor
        force["x"] -= velocity.loc[force.index, "vx"]
        force["y"] -= velocity.loc[force.index, "vy"]
        magnitude = self._magnitude(force)
        limit_scaling_factor = np.where(
            magnitude > max_force,
            max_force / magnitude,
            1.0,
        )
        force["x"] *= limit_scaling_factor
        force["y"] *= limit_scaling_factor
        return force[["x", "y"]]

    def _magnitude(self, df: pd.DataFrame) -> pd.Series[float]:
        return pd.Series(np.sqrt(np.square(df.x) + np.square(df.y)), dtype=float)

The major new Vivarium feature seen here is that of the attribute modifier, which we register with vivarium.framework.values.interface.ValuesInterface.register_attribute_modifier(). As the name suggests, this allows us to modify attributes, in this case adding the effect of a force to the acceleration attribute. We register that the apply_force method as the modifier like so:

File: ~/code/vivarium_examples/boids/forces.py
    builder.value.register_attribute_modifier(
        "acceleration",
        modifier=self.apply_force,
        required_resources=["x", "y", "vx", "vy", "neighbors"],
    )

Once we start adding components with these modifiers into our simulation, acceleration won’t always be zero anymore!

We then define our three forces using the Force base class. We won’t step through what these mean in detail. They mostly only override the _calculate_force method that calculates the force between a pair of boids. The separation force is a bit special in that it also defines an extra configurable parameter: the distance within which it should act.

File: ~/code/vivarium_examples/boids/forces.py
class Separation(Force):
    """Push boids apart when they get too close."""

    configuration_defaults = {
        "separation": {
            "distance": 30,
            "max_force": 0.03,
        },
    }

    def calculate_force(self, neighbors: pd.DataFrame) -> pd.DataFrame:
        # Push boids apart when they get too close
        separation_neighbors = neighbors[neighbors.distance < self.config.distance].copy()
        force_scaling_factor = np.where(
            separation_neighbors.distance > 0,
            ((-1 / separation_neighbors.distance) / separation_neighbors.distance),
            1.0,
        )
        separation_neighbors["force_x"] = (
            separation_neighbors["distance_x"] * force_scaling_factor
        )
        separation_neighbors["force_y"] = (
            separation_neighbors["distance_y"] * force_scaling_factor
        )

        force: pd.DataFrame = (
            separation_neighbors.groupby("index_self")[["force_x", "force_y"]]
            .sum()
            .rename(columns=lambda c: c.replace("force_", ""))
        )

        return force


class Cohesion(Force):
    """Push boids together."""

    def calculate_force(self, pairs: pd.DataFrame) -> pd.DataFrame:
        return (
            pairs.groupby("index_self")[["distance_x", "distance_y"]]
            .sum()
            .rename(columns=lambda c: c.replace("distance_", ""))
        )


class Alignment(Force):
    """Push boids toward where others are going."""

    def calculate_force(self, pairs: pd.DataFrame) -> pd.DataFrame:
        return (
            pairs.groupby("index_self")[["vx_other", "vy_other"]]
            .sum()
            .rename(columns=lambda c: c.replace("v", "").replace("_other", ""))
        )

For a quick test of our swarming behavior, let’s add in these forces and check in on our boids after 100 steps:

from vivarium import InteractiveContext
from vivarium_examples.boids.population import Population
from vivarium_examples.boids.movement import Movement
from vivarium_examples.boids.neighbors import Neighbors
from vivarium_examples.boids.forces import Separation, Cohesion, Alignment
from vivarium_examples.boids.visualization import plot_boids

sim = InteractiveContext(
   components=[Population(), Movement(), Neighbors(), Separation(), Cohesion(), Alignment()],
   configuration={'population': {'population_size': 500}},
   logging_verbosity=0,
)

sim.take_steps(100)

plot_boids(sim, plot_velocity=True)

(Source code, png, hires.png, pdf)

../_images/boids-2.png

Viewing our simulation as an animation

Great, our simulation is working! But it would be nice to see our boids moving around instead of having static snapshots. We’ll use the animation features in matplotlib to do this.

Add this method to visualization.py:

File: ~/code/vivarium_examples/boids/visualization.py
def plot_boids_animated(simulation: InteractiveContext) -> FuncAnimation:
    width = simulation.configuration.field.width
    height = simulation.configuration.field.height
    pop = simulation.get_population(["x", "y", "color"])

    fig = plt.figure(figsize=(12, 12))
    ax = fig.add_subplot(111)
    s = ax.scatter(pop.x, pop.y, color=pop.color)
    plt.xlabel("x")
    plt.ylabel("y")
    plt.axis((0, width, 0, height))

    frames = range(2_000)
    frame_pops = []
    for _ in frames:
        simulation.step()
        frame_pops.append(simulation.get_population(["x", "y"]))

    def animate(i: int) -> None:
        s.set_offsets(frame_pops[i])

    return FuncAnimation(fig, animate, frames=frames, interval=10)  # type: ignore[arg-type]

Then, try it out like so:

from vivarium import InteractiveContext
from vivarium_examples.boids.population import Population
from vivarium_examples.boids.movement import Movement
from vivarium_examples.boids.neighbors import Neighbors
from vivarium_examples.boids.forces import Separation, Cohesion, Alignment
from vivarium_examples.boids.visualization import plot_boids_animated

sim = InteractiveContext(
    components=[Population(), Movement(), Neighbors(), Separation(), Cohesion(), Alignment()],
    configuration={'population': {'population_size': 500}},
    logging_verbosity=0,
 )

anim = plot_boids_animated(sim)

Viewing this animation will depend a bit on what software you have installed. If you’re running Python in the terminal, this will save a video file:

anim.save('boids.mp4')

In IPython, this will display the animation:

HTML(anim.to_html5_video())

Either way, it will look like this: