Source code for vivarium_public_health.mslt.delay

"""
===============
Delayed Effects
===============

This module contains tools to represent delayed effects in a multi-state
lifetable simulation.

"""
from typing import Any, Dict, List, Optional

import numpy as np
import pandas as pd
from vivarium import Component
from vivarium.framework.engine import Builder
from vivarium.framework.event import Event
from vivarium.framework.population import SimulantData


[docs]class DelayedRisk(Component): """ A delayed risk represents an exposure whose impact takes time to come into effect (e.g., smoking uptake and cessation). The data required by this component are: Initial prevalence The initial level of exposure and post-exposure. Incidence rate The rate at which people begin to be exposed. This should be specified separately for the BAU and the intervention scenario. Remission rate The rate at which people stop being exposed. This should be specified separately for the BAU and the intervention scenario. Relative risk of mortality for currently exposed (e.g., currently smoking); and Relative risk of mortality for post-exposure (e.g., stopped smoking :math:`0..N` years ago). Disease-specific relative risks for currently exposed (e.g., currently smoking) Disease-specific relative risks for post-exposure (e.g., stopped smoking :math:`0..N` years ago). .. note:: The relative risks are defined in relation to the pre-exposure group (whose relative risks are therefore defined to be :math:`1`). The configuration options for this component are: ``constant_prevalence`` (boolean, default is ``False``) If this is set to ``True``, the remission rate in both the BAU and intervention will be kept fixed at 0 (i.e., no remission). ``tobacco_tax`` (boolean, default is ``False``) If this is set to ``True``, additional scaling effects are applied to both the incidence and remission rates. ``delay`` (integer, default is ``20``) The number of years, after remission, during which relative risks decrease back to their baseline values. Identify the disease(s) for which this delayed risk will have an effect in the simulation configuration. For example, to modify the incidence of CHD and stroke, this would look like: .. code-block:: yaml components: mslt_port: population: - BasePopulation() - Mortality() - Disability() disease: - Disease('CHD') - Disease('Stroke') delay: - DelayedRisk('tobacco') ... configuration: tobacco: constant_prevalence: False tobacco_tax: False delay: 20 affects: # This is where the affected diseases should be listed. CHD: Stroke: """ ############## # Properties # ############## @property def configuration_defaults(self) -> Dict[str, Any]: return { self.risk: { "constant_prevalence": False, "tobacco_tax": False, "delay": 20, }, } @property def columns_created(self) -> List[str]: return self._bin_names @property def columns_required(self) -> Optional[List[str]]: return ["age", "sex", "population"] @property def initialization_requirements(self) -> Dict[str, List[str]]: return { "requires_columns": ["age", "sex", "population"], "requires_values": [], "requires_streams": [], } ##################### # Lifecycle methods # ##################### def __init__(self, risk: str): """ Parameters ---------- risk The name of the exposure (e.g., ``"tobacco"``). """ super().__init__() self.risk = risk self._bin_names = []
[docs] def setup(self, builder: Builder) -> None: """Configure the delayed risk component. This involves loading the required data tables, registering event handlers and rate modifiers, and setting up the population view. """ self._bin_names = self.get_bin_names() self.config = builder.configuration self.start_year = builder.configuration.time.start.year self.clock = builder.time.clock() # Determine whether smoking prevalence should change over time. # The alternative scenario is that there is no remission; all people # who begin smoking will continue to smoke. self.constant_prevalence = self.config[self.risk]["constant_prevalence"] self.tobacco_tax = self.config[self.risk]["tobacco_tax"] self.bin_years = int(self.config[self.risk]["delay"]) # Load the initial prevalence. prev_data = pivot_load(builder, f"risk_factor.{self.risk}.prevalence") self.initial_prevalence = builder.lookup.build_table( prev_data, key_columns=["sex"], parameter_columns=["age", "year"] ) # Load the incidence rates for the BAU and intervention scenarios. inc_data = builder.lookup.build_table( pivot_load(builder, f"risk_factor.{self.risk}.incidence"), key_columns=["sex"], parameter_columns=["age", "year"], ) inc_name = "{}.incidence".format(self.risk) inc_int_name = "{}_intervention.incidence".format(self.risk) self.incidence = builder.value.register_rate_producer(inc_name, source=inc_data) self.int_incidence = builder.value.register_rate_producer( inc_int_name, source=inc_data ) # Load the remission rates for the BAU and intervention scenarios. rem_df = pivot_load(builder, f"risk_factor.{self.risk}.remission") # In the constant-prevalence case, assume there is no remission. if self.constant_prevalence: rem_df["value"] = 0.0 rem_data = builder.lookup.build_table( rem_df, key_columns=["sex"], parameter_columns=["age", "year"] ) rem_name = "{}.remission".format(self.risk) rem_int_name = "{}_intervention.remission".format(self.risk) self.remission = builder.value.register_rate_producer(rem_name, source=rem_data) self.int_remission = builder.value.register_rate_producer( rem_int_name, source=rem_data ) # We apply separate mortality rates to the different exposure bins. # This requires having access to the life table mortality rate, and # also the relative risks associated with each bin. self.acm_rate = builder.value.get_value("mortality_rate") mort_rr_data = pivot_load(builder, f"risk_factor.{self.risk}.mortality_relative_risk") self.mortality_rr = builder.lookup.build_table( mort_rr_data, key_columns=["sex"], parameter_columns=["age", "year"] ) # Register a modifier for each disease affected by this delayed risk. diseases = self.config[self.risk].affects.keys() for ix, disease in enumerate(diseases): self.register_modifier(builder, disease) # Load the disease-specific relative risks for each exposure bin. dis_rr_data = pivot_load(builder, f"risk_factor.{self.risk}.disease_relative_risk") # Check that the relative risk table includes required columns. key_columns = ["age_start", "age_end", "sex", "year_start", "year_end"] if set(key_columns) & set(dis_rr_data.columns) != set(key_columns): # Fallback option, handle tables that do not define bin edges. key_columns = ["age", "sex", "year"] if set(key_columns) & set(dis_rr_data.columns) != set(key_columns): msg = "Missing index columns for disease-specific relative risks" raise ValueError(msg) self.dis_rr = {} for disease in diseases: dis_columns = [c for c in dis_rr_data.columns if c.startswith(disease)] dis_keys = [c for c in dis_rr_data.columns if c in key_columns] if not dis_columns or not dis_keys: msg = "No {} relative risks for disease {}" raise ValueError(msg.format(self.risk, disease)) rr_data = dis_rr_data.loc[:, dis_keys + dis_columns] dis_prefix = "{}_".format(disease) bau_prefix = "{}.".format(self.risk) int_prefix = "{}_intervention.".format(self.risk) bau_col = { c: c.replace(dis_prefix, bau_prefix).replace("post_", "") for c in dis_columns } int_col = { c: c.replace(dis_prefix, int_prefix).replace("post_", "") for c in dis_columns } for column in dis_columns: # NOTE: avoid SettingWithCopyWarning rr_data.loc[:, int_col[column]] = rr_data[column] rr_data = rr_data.rename(columns=bau_col) self.dis_rr[disease] = builder.lookup.build_table( rr_data, key_columns=["sex"], parameter_columns=["age", "year"] ) # Load the effects of a tobacco tax. tax_inc = pivot_load(builder, f"risk_factor.{self.risk}.tax_effect_incidence") tax_rem = pivot_load(builder, f"risk_factor.{self.risk}.tax_effect_remission") self.tax_effect_inc = builder.lookup.build_table( tax_inc, key_columns=["sex"], parameter_columns=["age", "year"] ) self.tax_effect_rem = builder.lookup.build_table( tax_rem, key_columns=["sex"], parameter_columns=["age", "year"] ) mortality_data = pivot_load(builder, "cause.all_causes.mortality") self.tobacco_acmr = builder.value.register_rate_producer( "tobacco_acmr", source=builder.lookup.build_table( mortality_data, key_columns=["sex"], parameter_columns=["age", "year"] ), )
################# # Setup methods # #################
[docs] def get_bin_names(self): """Return the bin names for both the BAU and the intervention scenario. These names take the following forms: ``"name.no"`` The number of people who have never been exposed. ``"name.yes"`` The number of people currently exposed. ``"name.N"`` The number of people N years post-exposure. The final bin is the number of people :math:`\ge N` years post-exposure. The intervention bin names take the form ``"name_intervention.X"``. """ if self.bin_years == 0: delay_bins = [str(0)] else: delay_bins = [str(s) for s in range(self.bin_years + 2)] bins = ["no", "yes"] + delay_bins bau_bins = ["{}.{}".format(self.risk, bin) for bin in bins] int_bins = ["{}_intervention.{}".format(self.risk, bin) for bin in bins] all_bins = bau_bins + int_bins return all_bins
[docs] def register_modifier(self, builder: Builder, disease: str) -> None: """Register that a disease incidence rate will be modified by this delayed risk in the intervention scenario. Parameters ---------- builder The builder object for the simulation, which provides access to event handlers and rate modifiers. disease The name of the disease whose incidence rate will be modified. """ # NOTE: we need to modify different rates for chronic and acute # diseases. For now, register modifiers for all possible rates. rate_templates = [ "{}_intervention.incidence", "{}_intervention.excess_mortality", "{}_intervention.yld_rate", ] for template in rate_templates: rate_name = template.format(disease) modifier = lambda ix, rate: self.incidence_adjustment(disease, ix, rate) builder.value.register_value_modifier(rate_name, modifier)
######################## # Event-driven methods # ########################
[docs] def on_initialize_simulants(self, pop_data: SimulantData) -> None: """ Define the initial distribution of the population across the bins, in both the BAU and the intervention scenario. """ # Set all bins to zero, in order to create the required columns. pop = pd.DataFrame({}, index=pop_data.index) for column in self.get_bin_names(): pop[column] = 0.0 # Update the life table, so that we can then obtain a view that # includes the population counts. self.population_view.update(pop) pop = self.population_view.get(pop_data.index) # Calculate the absolute prevalence by multiplying the fractional # prevalence by the population size for each cohort. # NOTE: the number of current smokers is defined at the middle of each # year; i.e., it corresponds to the person_years. bau_acmr = self.tobacco_acmr.source(pop_data.index) bau_probability_of_death = 1 - np.exp(-bau_acmr) pop.population *= 1 - 0.5 * bau_probability_of_death prev = self.initial_prevalence(pop_data.index).mul(pop["population"], axis=0) self.population_view.update(prev) # Rename the columns and apply the same initial prevalence for the # intervention. bau_prefix = "{}.".format(self.risk) int_prefix = "{}_intervention.".format(self.risk) rename_to = { c: c.replace(bau_prefix, int_prefix) for c in prev.columns if c.startswith(bau_prefix) } int_prev = prev.rename(columns=rename_to) self.population_view.update(int_prev)
[docs] def on_time_step_prepare(self, event: Event) -> None: """Account for transitions between bins, and for mortality rates. These transitions include: - New exposures - Cessation of exposure - Increased duration of time since exposure """ if self.clock().year == self.start_year: return pop = self.population_view.get(event.index) if pop.empty: return idx = pop.index bau_acmr = self.acm_rate.source(idx) inc_rate = self.incidence(idx) rem_rate = self.remission(idx) int_inc_rate = self.int_incidence(idx) int_rem_rate = self.int_remission(idx) # Identify the relevant columns for the BAU and intervention. bin_cols = self.get_bin_names() bau_prefix = "{}.".format(self.risk) int_prefix = "{}_intervention.".format(self.risk) bau_cols = [c for c in bin_cols if c.startswith(bau_prefix)] int_cols = [c for c in bin_cols if c.startswith(int_prefix)] # Extract the RR of mortality associated with each exposure level. mort_rr = self.mortality_rr(idx) # Normalise the survival rate; never-smokers should have a mortality # rate that is lower than the ACMR, since current-smokers and # previous-smokers have higher RRs of mortality. weight_by_initial_prevalence = True if weight_by_initial_prevalence: # Load the initial exposure distribution, because it will be used # to adjust the ACMR. prev = self.initial_prevalence(pop.index) prev = prev.loc[:, bau_cols] # Multiply these fractions by the RR of mortality associated with # each exposure level. bau_wtd_rr = prev.mul(mort_rr.loc[:, bau_cols]) else: # Calculate the fraction of the population in each exposure level. bau_popn = pop.loc[:, bau_cols].sum(axis=1) bau_prev = pop.loc[:, bau_cols].divide(bau_popn, axis=0) # Multiply these fractions by the RR of mortality associated with # each exposure level. bau_wtd_rr = bau_prev.mul(mort_rr.loc[:, bau_cols]) # Sum these terms to obtain the net RR of mortality. bau_net_rr = bau_wtd_rr.sum(axis=1) # The mortality rate for never-smokers is the population ACMR divided # by this net RR of mortality. bau_acmr_no = bau_acmr.divide(bau_net_rr) # NOTE: adjust the RR *after* calculating the ACMR adjustments, but # *before* calculating the survival probability for each exposure # level. penultimate_cols = [s + str(self.bin_years) for s in [bau_prefix, int_prefix]] mort_rr.loc[:, penultimate_cols] = 1.0 # Calculate the mortality risk for non-smokers. bau_surv_no = 1 - np.exp(-bau_acmr_no) # Calculate the survival probability for each exposure level: # (1 - mort_risk_non_smokers)^RR bau_surv_rate = mort_rr.loc[:, bau_cols].rpow(1 - bau_surv_no, axis=0) # Calculate the number of survivors for each exposure level (BAU). pop.loc[:, bau_cols] = pop.loc[:, bau_cols].mul(bau_surv_rate) # Calculate the number of survivors for each exposure level # (intervention). # NOTE: we apply the same survival rate to each exposure level for # the intervention scenario as we used for the BAU scenario. rename_to = {c: c.replace(".", "_intervention.") for c in bau_surv_rate.columns} int_surv_rate = bau_surv_rate.rename(columns=rename_to) pop.loc[:, int_cols] = pop.loc[:, int_cols].mul(int_surv_rate) # Account for transitions between bins. # Note that the order of evaluation matters. suffixes = ["", "_intervention"] # First, accumulate the final post-exposure bin. if self.bin_years > 0: for suffix in suffixes: accum_col = "{}{}.{}".format(self.risk, suffix, self.bin_years + 1) from_col = "{}{}.{}".format(self.risk, suffix, self.bin_years) pop[accum_col] += pop[from_col] # Then increase time since exposure for all other post-exposure bins. for n_years in reversed(range(self.bin_years)): for suffix in suffixes: source_col = "{}{}.{}".format(self.risk, suffix, n_years) dest_col = "{}{}.{}".format(self.risk, suffix, n_years + 1) pop[dest_col] = pop[source_col] # Account for incidence and remission. col_no = "{}.no".format(self.risk) col_int_no = "{}_intervention.no".format(self.risk) col_yes = "{}.yes".format(self.risk) col_int_yes = "{}_intervention.yes".format(self.risk) col_zero = "{}.0".format(self.risk) col_int_zero = "{}_intervention.0".format(self.risk) inc = inc_rate * pop[col_no] int_inc = int_inc_rate * pop[col_int_no] rem = rem_rate * pop[col_yes] int_rem = int_rem_rate * pop[col_int_yes] # Account for the effects of a tobacco tax. if self.tobacco_tax: # The tax has a scaling effect (reduction) on incidence, and # causes additional remission. tax_inc = self.tax_effect_inc(idx) tax_rem = self.tax_effect_rem(idx) int_inc = int_inc * tax_inc int_rem = int_rem + (1 - tax_rem) * pop[col_int_yes] # Apply the incidence rate to the never-exposed population. pop[col_no] = pop[col_no] - inc pop[col_int_no] = pop[col_int_no] - int_inc # Incidence and remission affect who is currently exposed. pop[col_yes] = pop[col_yes] + inc - rem pop[col_int_yes] = pop[col_int_yes] + int_inc - int_rem # Those who have just remitted enter the first post-remission bin. pop[col_zero] = rem pop[col_int_zero] = int_rem self.population_view.update(pop)
################################## # Pipeline sources and modifiers # ##################################
[docs] def incidence_adjustment( self, disease: str, index: pd.Index, incidence_rate: pd.Series ) -> pd.Series: """Modify a disease incidence rate in the intervention scenario. Parameters ---------- disease The name of the disease. index The index into the population life table. incidence_rate The un-adjusted disease incidence rate. """ # Multiply the population in each bin by the associated relative risk. bin_cols = self.get_bin_names() pop = self.population_view.get(index) incidence_rr = self.dis_rr[disease](pop.index)[bin_cols] rr_values = pop[bin_cols] * incidence_rr # Calculate the mean relative-risk for the BAU scenario. bau_prefix = "{}.".format(self.risk) bau_cols = [c for c in bin_cols if c.startswith(bau_prefix)] # Sum over all of the bins in each row. mean_bau_rr = rr_values[bau_cols].sum(axis=1) / pop[bau_cols].sum(axis=1) # Handle cases where the population size is zero. mean_bau_rr = mean_bau_rr.fillna(1.0) # Calculate the mean relative-risk for the intervention scenario. int_prefix = "{}_intervention.".format(self.risk) int_cols = [c for c in bin_cols if c.startswith(int_prefix)] # Sum over all of the bins in each row. mean_int_rr = rr_values[int_cols].sum(axis=1) / pop[int_cols].sum(axis=1) # Handle cases where the population size is zero. mean_int_rr = mean_int_rr.fillna(1.0) # Calculate the disease incidence PIF for the intervention scenario. pif = (mean_bau_rr - mean_int_rr) / mean_bau_rr pif = pif.fillna(0.0) return incidence_rate * (1 - pif)
[docs]def pivot_load(builder: Builder, entity_key: str) -> pd.DataFrame: """Helper method for loading dataframe from artifact. Performs a long to wide conversion if dataframe has an index column named 'measure'. """ data = builder.data.load(entity_key) if "measure" in data.columns: data = ( data.pivot_table( index=[i for i in data.columns if i not in ["measure", "value"]], columns="measure", values="value", ) .rename_axis(None, axis=1) .reset_index() ) return data