"""
=========
Observers
=========
This module contains tools for recording various outputs of interest in
multi-state lifetable simulations.
"""
from typing import List, Optional
import pandas as pd
from vivarium import Component
from vivarium.framework.engine import Builder
from vivarium.framework.event import Event
[docs]def output_file(config, suffix, sep="_", ext="csv"):
"""
Determine the output file name for an observer, based on the prefix
defined in ``config.observer.output_prefix`` and the (optional)
``config.input_data.input_draw_number``.
Parameters
----------
config
The builder configuration object.
suffix
The observer-specific suffix.
sep
The separator between prefix, suffix, and draw number.
ext
The output file extension.
"""
if "observer" not in config:
raise ValueError("observer.output_prefix not defined")
if "output_prefix" not in config.observer:
raise ValueError("observer.output_prefix not defined")
prefix = config.observer.output_prefix
if "input_draw_number" in config.input_data:
draw = config.input_data.input_draw_number
else:
draw = 0
out_file = prefix + sep + suffix
if draw > 0:
out_file += "{}{}".format(sep, draw)
out_file += ".{}".format(ext)
return out_file
[docs]class MorbidityMortality(Component):
"""
This class records the all-cause morbidity and mortality rates for each
cohort at each year of the simulation.
Parameters
----------
output_suffix
The suffix for the CSV file in which to record the
morbidity and mortality data.
"""
##############
# Properties #
##############
@property
def columns_required(self) -> Optional[List[str]]:
return [
"age",
"sex",
"population",
"bau_population",
"acmr",
"bau_acmr",
"pr_death",
"bau_pr_death",
"deaths",
"bau_deaths",
"yld_rate",
"bau_yld_rate",
"person_years",
"bau_person_years",
"HALY",
"bau_HALY",
]
#####################
# Lifecycle methods #
#####################
def __init__(self, output_suffix: str = "mm"):
super().__init__()
self.output_suffix = output_suffix
[docs] def setup(self, builder: Builder) -> None:
# Record the key columns from the core multi-state life table.
self.clock = builder.time.clock()
self.tables = []
self.table_cols = self.columns_required + [
"year",
"prev_population",
"bau_prev_population",
]
self.output_file = output_file(builder.configuration, self.output_suffix)
########################
# Event-driven methods #
########################
[docs] def on_collect_metrics(self, event: Event) -> None:
pop = self.population_view.get(event.index)
if len(pop.index) == 0:
# No tracked population remains.
return
pop["year"] = self.clock().year
# Record the population size prior to the deaths.
pop["prev_population"] = pop["population"] + pop["deaths"]
pop["bau_prev_population"] = pop["bau_population"] + pop["bau_deaths"]
self.tables.append(pop[self.table_cols])
[docs] def on_simulation_end(self, event: Event) -> None:
data = pd.concat(self.tables, ignore_index=True)
data["year_of_birth"] = data["year"] - data["age"]
# Sort the table by cohort (i.e., generation and sex), and then by
# calendar year, so that results are output in the same order as in
# the spreadsheet models.
data = data.sort_values(by=["year_of_birth", "sex", "age"], axis=0)
data = data.reset_index(drop=True)
# Re-order the table columns.
cols = ["year_of_birth"] + self.table_cols
data = data[cols]
# Calculate life expectancy and HALE for the BAU and intervention,
# with respect to the initial population, not the survivors.
data["LE"] = self.calculate_LE(data, "person_years", "prev_population")
data["bau_LE"] = self.calculate_LE(data, "bau_person_years", "bau_prev_population")
data["HALE"] = self.calculate_LE(data, "HALY", "prev_population")
data["bau_HALE"] = self.calculate_LE(data, "bau_HALY", "bau_prev_population")
data.to_csv(self.output_file, index=False)
##################
# Helper methods #
##################
[docs] def calculate_LE(self, table, py_col, denom_col):
"""Calculate the life expectancy for each cohort at each time-step.
Parameters
----------
table
The population life table.
py_col
The name of the person-years column.
denom_col
The name of the population denominator column.
Returns
-------
The life expectancy for each table row, represented as a
pandas.Series object.
"""
# Group the person-years by cohort.
group_cols = ["year_of_birth", "sex"]
subset_cols = group_cols + [py_col]
grouped = table.loc[:, subset_cols].groupby(by=group_cols)[py_col]
# Calculate the reverse-cumulative sums of the adjusted person-years
# (i.e., the present and future person-years) by:
# (a) reversing the adjusted person-years values in each cohort;
# (b) calculating the cumulative sums in each cohort; and
# (c) restoring the original order.
cumsum = grouped.apply(lambda x: pd.Series(x[::-1].cumsum()).iloc[::-1])
return cumsum / table[denom_col]
[docs]class Disease(Component):
"""
This class records the disease incidence rate and disease prevalence for
each cohort at each year of the simulation.
Parameters
----------
disease
The name of the chronic disease.
output_suffix
The suffix for the CSV file in which to record the
disease data.
"""
##############
# Properties #
##############
@property
def columns_required(self) -> Optional[List[str]]:
return [
"age",
"sex",
self.bau_S_col,
self.bau_C_col,
self.int_S_col,
self.int_C_col,
]
#####################
# Lifecycle methods #
#####################
def __init__(self, disease: str, output_suffix: Optional[str] = None):
super().__init__()
self.disease = disease
if output_suffix is None:
output_suffix = disease.lower()
self.output_suffix = output_suffix
self.bau_S_col = "{}_S".format(self.disease)
self.bau_C_col = "{}_C".format(self.disease)
self.int_S_col = "{}_S_intervention".format(self.disease)
self.int_C_col = "{}_C_intervention".format(self.disease)
[docs] def setup(self, builder: Builder) -> None:
bau_incidence_value = "{}.incidence".format(self.disease)
int_incidence_value = "{}_intervention.incidence".format(self.disease)
self.bau_incidence = builder.value.get_value(bau_incidence_value)
self.int_incidence = builder.value.get_value(int_incidence_value)
self.tables = []
self.table_cols = [
"sex",
"age",
"year",
"bau_incidence",
"int_incidence",
"bau_prevalence",
"int_prevalence",
"bau_deaths",
"int_deaths",
]
self.clock = builder.time.clock()
self.output_file = output_file(builder.configuration, self.output_suffix)
[docs] def on_collect_metrics(self, event: Event) -> None:
pop = self.population_view.get(event.index)
if len(pop.index) == 0:
# No tracked population remains.
return
pop["year"] = self.clock().year
pop["bau_incidence"] = self.bau_incidence(event.index)
pop["int_incidence"] = self.int_incidence(event.index)
pop["bau_prevalence"] = pop[self.bau_C_col] / (
pop[self.bau_C_col] + pop[self.bau_S_col]
)
pop["int_prevalence"] = pop[self.int_C_col] / (
pop[self.bau_C_col] + pop[self.bau_S_col]
)
pop["bau_deaths"] = 1000 - pop[self.bau_S_col] - pop[self.bau_C_col]
pop["int_deaths"] = 1000 - pop[self.int_S_col] - pop[self.int_C_col]
self.tables.append(pop.loc[:, self.table_cols])
[docs] def on_simulation_end(self, event: Event) -> None:
data = pd.concat(self.tables, ignore_index=True)
data["diff_incidence"] = data["int_incidence"] - data["bau_incidence"]
data["diff_prevalence"] = data["int_prevalence"] - data["bau_prevalence"]
data["year_of_birth"] = data["year"] - data["age"]
data["disease"] = self.disease
# Sort the table by cohort (i.e., generation and sex), and then by
# calendar year, so that results are output in the same order as in
# the spreadsheet models.
data = data.sort_values(by=["year_of_birth", "sex", "age"], axis=0)
data = data.reset_index(drop=True)
# Re-order the table columns.
diff_cols = ["diff_incidence", "diff_prevalence"]
cols = ["disease", "year_of_birth"] + self.table_cols + diff_cols
data = data[cols]
data.to_csv(self.output_file, index=False)
[docs]class TobaccoPrevalence(Component):
"""This class records the prevalence of tobacco use in the population.
Parameters
----------
output_suffix
The suffix for the CSV file in which to record the
prevalence data.
"""
##############
# Properties #
##############
@property
def columns_required(self) -> Optional[List[str]]:
return ["age", "sex", "bau_population", "population"] + self._bin_names
#####################
# Lifecycle methods #
#####################
def __init__(self, output_suffix: str = "tobacco"):
super().__init__()
self.output_suffix = output_suffix
self._bin_names = []
[docs] def setup(self, builder: Builder) -> None:
self._bin_names = self.get_bin_names()
self.config = builder.configuration
self.clock = builder.time.clock()
self.bin_years = int(self.config["tobacco"]["delay"])
self.tables = []
self.table_cols = [
"age",
"sex",
"year",
"bau_no",
"bau_yes",
"bau_previously",
"bau_population",
"int_no",
"int_yes",
"int_previously",
"int_population",
]
self.output_file = output_file(builder.configuration, self.output_suffix)
#################
# Setup methods #
#################
[docs] def get_bin_names(self):
"""Return the bin names for both the BAU and the intervention scenario.
These names take the following forms:
``"name.no"``
The number of people who have never been exposed.
``"name.yes"``
The number of people currently exposed.
``"name.N"``
The number of people N years post-exposure.
The final bin is the number of people :math:`\ge N` years
post-exposure.
The intervention bin names take the form ``"name_intervention.X"``.
"""
if self.bin_years == 0:
delay_bins = [str(0)]
else:
delay_bins = [str(s) for s in range(self.bin_years + 2)]
bins = ["no", "yes"] + delay_bins
bau_bins = ["{}.{}".format("tobacco", bin) for bin in bins]
int_bins = ["{}_intervention.{}".format("tobacco", bin) for bin in bins]
all_bins = bau_bins + int_bins
return all_bins
########################
# Event-driven methods #
########################
[docs] def on_collect_metrics(self, event: Event) -> None:
pop = self.population_view.get(event.index)
if len(pop.index) == 0:
# No tracked population remains.
return
bau_cols = [c for c in pop.columns.values if c.startswith("{}.".format("tobacco"))]
int_cols = [
c
for c in pop.columns.values
if c.startswith("{}_intervention.".format("tobacco"))
]
bau_denom = pop.reindex(columns=bau_cols).sum(axis=1)
int_denom = pop.reindex(columns=int_cols).sum(axis=1)
# Normalise prevalence with respect to the total population.
pop["bau_no"] = pop["{}.no".format("tobacco")] / bau_denom
pop["bau_yes"] = pop["{}.yes".format("tobacco")] / bau_denom
pop["bau_previously"] = 1 - pop["bau_no"] - pop["bau_yes"]
pop["int_no"] = pop["{}_intervention.no".format("tobacco")] / int_denom
pop["int_yes"] = pop["{}_intervention.yes".format("tobacco")] / int_denom
pop["int_previously"] = 1 - pop["int_no"] - pop["int_yes"]
pop = pop.rename(columns={"population": "int_population"})
pop["year"] = self.clock().year
self.tables.append(pop.reindex(columns=self.table_cols).reset_index(drop=True))
[docs] def on_simulation_end(self, event: Event) -> None:
data = pd.concat(self.tables, ignore_index=True)
data["year_of_birth"] = data["year"] - data["age"]
# Sort the table by cohort (i.e., generation and sex), and then by
# calendar year, so that results are output in the same order as in
# the spreadsheet models.
data = data.sort_values(by=["year_of_birth", "sex", "age"], axis=0)
data = data.reset_index(drop=True)
# Re-order the table columns.
cols = ["year_of_birth"] + self.table_cols
data = data.reindex(columns=cols)
data.to_csv(self.output_file, index=False)