Source code for vivarium_public_health.population.data_transformations

"""
===============================
Population Data Transformations
===============================

Provide tools for handling raw demographic data and transforming
it into different distributions for sampling.

"""

from collections import namedtuple

import numpy as np
import pandas as pd
from vivarium.framework.engine import Builder
from vivarium.framework.randomness import RandomnessStream

_SORT_ORDER = ["location", "year_start", "sex", "age_start"]


[docs] def assign_demographic_proportions( population_data: pd.DataFrame, include_sex: str, ) -> pd.DataFrame: """Calculate conditional probabilities on the provided population data for sampling. Parameters ---------- population_data Table with columns 'age', 'sex', 'year', 'location', and 'value' include_sex 'Female', 'Male', or 'Both'. Sexes to include in the distribution. Returns ------- Table with columns 'age' : Midpoint of the age group, 'age_start' : Lower bound of the age group, 'age_end' : Upper bound of the age group, 'sex' : 'Male' or 'Female', 'location' : location, 'year' : Year, 'population' : Total population estimate, 'P(sex, location | age, year)' : Conditional probability of sex and location given age and year, 'P(sex, location, age | year)' : Conditional probability of sex, location, and age given year, 'P(age | year, sex, location)' : Conditional probability of age given year, sex, and location. """ population_data = population_data.copy() if include_sex != "Both": population_data.loc[population_data.sex != include_sex, "value"] = 0.0 year_start_groups = population_data.groupby("year_start", as_index=False) population_data["P(sex, location, age| year)"] = ( year_start_groups[year_start_groups.obj.columns] .apply(lambda sub_pop: sub_pop[["value"]] / sub_pop["value"].sum()) .reset_index(level=0)["value"] .fillna(0.0) ) age_year_start_groups = population_data.groupby(["age", "year_start"], as_index=False) population_data["P(sex, location | age, year)"] = ( age_year_start_groups[age_year_start_groups.obj.columns] .apply(lambda sub_pop: sub_pop[["value"]] / sub_pop["value"].sum()) .reset_index(level=0)["value"] .fillna(0.0) ) year_start_sex_location_groups = population_data.groupby( ["year_start", "sex", "location"], as_index=False ) population_data["P(age | year, sex, location)"] = ( year_start_sex_location_groups[year_start_sex_location_groups.obj.columns] .apply(lambda sub_pop: sub_pop[["value"]] / sub_pop["value"].sum()) .reset_index(level=0)["value"] .fillna(0.0) ) return population_data.sort_values(_SORT_ORDER).reset_index(drop=True)
# FIXME: This step should definitely be happening after we get some approximation of the # underlying distribution. It makes an assumption of uniformity in the age bin. # This only happens at the edges, and is unlikely to be used to clip a neonatal age bin # where the impact would be significant.
[docs] def rescale_binned_proportions( pop_data: pd.DataFrame, age_start: float, age_end: float, ) -> pd.DataFrame: """Reshape the distribution so that bin edges fall on the age_start and age_end. Parameters ---------- pop_data Table with columns 'age', 'age_start', 'age_end', 'sex', 'year', 'location', 'population', 'P(sex, location, age| year)', 'P(sex, location | age, year)', 'P(age | year, sex, location)' age_start The starting age for the rescaled bins. age_end The terminal age for the rescaled bins. Returns ------- Table with the same columns as `pop_data` where all bins outside the range (age_start, age_end) have been discarded. If age_start and age_end don't fall cleanly on age boundaries, the bins in which they lie are clipped and the 'population', 'P(sex, location, age| year)', and 'P(age | year, sex, location)' values are rescaled to reflect their smaller representation. """ col_order = pop_data.columns.copy() if age_start > pop_data["age_end"].max(): raise ValueError( "Provided population data is insufficient to model the requested age range." ) age_start = max(pop_data["age_start"].min(), age_start) age_end = min(pop_data["age_end"].max(), age_end) - 1e-8 pop_data = _add_edge_age_groups(pop_data.copy()) columns_to_scale = [ "P(sex, location, age| year)", "P(age | year, sex, location)", "value", ] for _, sub_pop in pop_data.groupby(["sex", "location"]): min_bin = sub_pop[ (sub_pop["age_start"] <= age_start) & (age_start < sub_pop["age_end"]) ] padding_bin = sub_pop[sub_pop["age_end"] == float(min_bin["age_start"].iloc[0])] min_scale = (float(min_bin["age_end"].iloc[0]) - age_start) / float( min_bin["age_end"].iloc[0] - min_bin["age_start"].iloc[0] ) remainder = pop_data.loc[min_bin.index, columns_to_scale].values * (1 - min_scale) pop_data.loc[min_bin.index, columns_to_scale] *= min_scale pop_data.loc[padding_bin.index, columns_to_scale] += remainder pop_data.loc[min_bin.index, "age_start"] = age_start pop_data.loc[min_bin.index, "age"] = float( ( pop_data.loc[min_bin.index, "age_start"].iloc[0] + pop_data.loc[min_bin.index, "age_end"].iloc[0] ) / 2 ) pop_data.loc[padding_bin.index, "age_end"] = age_start pop_data.loc[padding_bin.index, "age"] = float( ( pop_data.loc[padding_bin.index, "age_start"].iloc[0] + pop_data.loc[padding_bin.index, "age_end"].iloc[0] ) / 2 ) max_bin = sub_pop[(sub_pop["age_end"] > age_end) & (age_end >= sub_pop["age_start"])] padding_bin = sub_pop[sub_pop["age_start"] == float(max_bin["age_end"].iloc[0])] max_scale = (age_end - float(max_bin["age_start"].iloc[0])) / float( max_bin["age_end"].iloc[0] - max_bin["age_start"].iloc[0] ) remainder = pop_data.loc[max_bin.index, columns_to_scale] * (1 - max_scale) pop_data.loc[max_bin.index, columns_to_scale] *= max_scale pop_data.loc[padding_bin.index, columns_to_scale] += remainder.values pop_data.loc[max_bin.index, "age_end"] = age_end pop_data.loc[padding_bin.index, "age_start"] = age_end return pop_data[col_order].sort_values(_SORT_ORDER).reset_index(drop=True)
def _add_edge_age_groups(pop_data: pd.DataFrame) -> pd.DataFrame: """Pad the population data with sentinel age groups for boundary interpolation. Adds a lower padding bin mirroring the values of the first real bin (enforcing constant left extrapolation) and an upper padding bin with zero values immediately above the maximum age (enforcing extrapolation to zero on the right). Parameters ---------- pop_data Table with standard index columns (``location``, ``year_start``, ``year_end``, ``sex``) and age columns (``age``, ``age_start``, ``age_end``). Returns ------- The input table with two additional rows per group: one padding bin below the minimum age and one above the maximum age, sorted and re-indexed. """ index_cols = ["location", "year_start", "year_end", "sex"] age_cols = ["age", "age_start", "age_end"] other_cols = [c for c in pop_data.columns if c not in index_cols + age_cols] pop_data = pop_data.set_index(index_cols) # For the lower bin, we want constant interpolation off the left side min_valid_age = pop_data["age_start"].min() # This bin width needs to be the same as the lowest bin. min_pad_age = min_valid_age - (pop_data["age_end"].min() - min_valid_age) min_pad_age_midpoint = (min_valid_age + min_pad_age) * 0.5 lower_bin = pd.DataFrame( {"age_start": min_pad_age, "age_end": min_valid_age, "age": min_pad_age_midpoint}, index=pop_data.index.unique(), ) lower_bin[other_cols] = pop_data.loc[pop_data["age_start"] == min_valid_age, other_cols] # For the upper bin, we want our interpolation to go to zero. max_valid_age = pop_data["age_end"].max() # This bin width is not arbitrary. It effects the rate at which our interpolation # zeros out. Since for the 2016 round the maximum age is 125, we're assuming almost no # one lives past that age, so we make this bin 1 week. A more robust technique for # this would be better. max_pad_age = max_valid_age + 7 / 365 max_pad_age_midpoint = (max_valid_age + max_pad_age) * 0.5 upper_bin = pd.DataFrame( {"age_start": max_valid_age, "age_end": max_pad_age, "age": max_pad_age_midpoint}, index=pop_data.index.unique(), ) # We're doing the multiplication to ensure we get the correct data shape and index. upper_bin[other_cols] = 0 * pop_data.loc[pop_data["age_end"] == max_valid_age, other_cols] pop_data = pd.concat([lower_bin, pop_data, upper_bin], sort=False).reset_index() pop_data = pop_data.rename( columns={ "level_0": "location", "level_1": "year_start", "level_2": "year_end", "level_3": "sex", } ) return ( pop_data[index_cols + age_cols + other_cols] .sort_values(by=["location", "year_start", "year_end", "age"]) .reset_index(drop=True) ) AgeValues = namedtuple("AgeValues", ["current", "young", "old"]) EndpointValues = namedtuple("EndpointValues", ["left", "right"])
[docs] def smooth_ages( simulants: pd.DataFrame, population_data: pd.DataFrame, randomness: RandomnessStream, ) -> pd.DataFrame: """Distribute simulants among ages within their assigned age bins. Parameters ---------- simulants Table with columns 'age', 'sex', and 'location' population_data Table with columns 'age', 'sex', 'year', 'location', 'population', 'P(sex, location, age| year)', 'P(sex, location | age, year)', 'P(age | year, sex, location)' randomness Source of random number generation within the vivarium common random number framework. Returns ------- Table with same columns as `simulants` with ages smoothed out within the age bins. """ simulants = simulants.copy() for (sex, location), sub_pop in population_data.groupby(["sex", "location"]): ages = sorted(sub_pop["age"].unique()) younger = [float(sub_pop.loc[sub_pop["age"] == ages[0], "age_start"].iloc[0])] + ages[ :-1 ] older = ages[1:] + [float(sub_pop.loc[sub_pop["age"] == ages[-1], "age_end"].iloc[0])] uniform_all = randomness.get_draw(simulants.index) for age_set in zip(ages, younger, older): age = AgeValues(*age_set) has_correct_demography = ( (simulants["age"] == age.current) & (simulants["sex"] == sex) & (simulants["location"] == location) ) affected = simulants[has_correct_demography] if affected.empty: continue # bin endpoints endpoints, proportions = _get_bins_and_proportions(sub_pop, age) pdf, slope, area, cdf_inflection_point = _construct_sampling_parameters( age, endpoints, proportions ) # Make a draw from a uniform distribution uniform_rv = uniform_all.loc[affected.index] left_sims = affected[uniform_rv <= cdf_inflection_point] right_sims = affected[uniform_rv > cdf_inflection_point] simulants.loc[left_sims.index, "age"] = _compute_ages( uniform_rv[left_sims.index], endpoints.left, pdf.left, slope.left, area ) simulants.loc[right_sims.index, "age"] = _compute_ages( uniform_rv[right_sims.index] - cdf_inflection_point, age.current, proportions.current, slope.right, area, ) return simulants
def _get_bins_and_proportions( pop_data: pd.DataFrame, age: AgeValues, ) -> tuple[EndpointValues, AgeValues]: """Get the bin edges and population proportions in the current and neighboring bins. Parameters ---------- pop_data Table with columns 'age', 'sex', 'year', 'location', 'population', 'P(sex, location, age| year)', 'P(sex, location | age, year)', 'P(age | year, sex, location)' age Tuple with values ( midpoint of current age bin, midpoint of previous age bin, midpoint of next age bin, ) Returns ------- A tuple of endpoints tuples and ages tuples. The `EndpointValues` tuple has values ( age at left edge of bin, age at right edge of bin, ) The `AgeValues` tuple has values ( proportion of pop in current bin, proportion of pop in previous bin, proportion of pop in next bin, ) """ left = float(pop_data.loc[pop_data["age"] == age.current, "age_start"].iloc[0]) right = float(pop_data.loc[pop_data["age"] == age.current, "age_end"].iloc[0]) if not pop_data.loc[pop_data["age"] == age.young, "age_start"].empty: lower_left = float(pop_data.loc[pop_data["age"] == age.young, "age_start"].iloc[0]) else: lower_left = left if not pop_data.loc[pop_data["age"] == age.old, "age_end"].empty: upper_right = float(pop_data.loc[pop_data["age"] == age.old, "age_end"].iloc[0]) else: upper_right = right # proportion in this bin and the neighboring bins proportion_column = "P(age | year, sex, location)" # Here we make the assumption that # P(left < age < right | year, sex, location) = p * (right - left) # in order to back out a point estimate for the probability density at the center # of the interval. This not the best assumption, but it'll do. p_age = float( pop_data.loc[pop_data["age"] == age.current, proportion_column].iloc[0] / (right - left) ) p_young = ( float( pop_data.loc[pop_data["age"] == age.young, proportion_column].iloc[0] / (left - lower_left) ) if age.young != left else p_age ) p_old = ( float( pop_data.loc[pop_data["age"] == age.old, proportion_column].iloc[0] / (upper_right - right) ) if age.old != right else 0 ) return EndpointValues(left, right), AgeValues(p_age, p_young, p_old) def _construct_sampling_parameters( age: AgeValues, endpoint: EndpointValues, proportion: AgeValues ) -> tuple[EndpointValues, EndpointValues, float, float]: """Calculate sampling distribution parameters from known values. Parameters ---------- age Tuple with values ( midpoint of current age bin, midpoint of previous age bin, midpoint of next age bin, ) endpoint : EndpointValues Tuple with values ( age at left edge of bin, age at right edge of bin, ) proportion : AgeValues Tuple with values ( proportion of pop in current bin, proportion of pop in previous bin, proportion of pop in next bin, ) Returns ------- A tuple of (pdf, slope, area, cdf_inflection_point) where pdf is a tuple with values ( pdf evaluated at left bin edge, pdf evaluated at right bin edge, ) slope is a tuple with values ( slope of pdf in left half bin, slope of pdf in right half bin, ) area is the total area under the pdf, used for normalization cdf_inflection_point is the value of the cdf at the midpoint of the age bin. """ # pdf value at bin endpoints pdf_left = (proportion.current - proportion.young) / (age.current - age.young) * ( endpoint.left - age.young ) + proportion.young pdf_right = (proportion.old - proportion.current) / (age.old - age.current) * ( endpoint.right - age.current ) + proportion.current area = 0.5 * ( (proportion.current + pdf_left) * (age.current - endpoint.left) + (pdf_right + proportion.current) * (endpoint.right - age.current) ) pdf = EndpointValues(pdf_left, pdf_right) # pdf slopes. m_left = (proportion.current - pdf.left) / (age.current - endpoint.left) m_right = (pdf.right - proportion.current) / (endpoint.right - age.current) slope = EndpointValues(m_left, m_right) # The decision bound on the uniform rv. cdf_inflection_point = ( 1 / (2 * area) * (proportion.current + pdf.left) * (age.current - endpoint.left) ) return pdf, slope, area, cdf_inflection_point def _compute_ages( uniform_rv: np.ndarray | float, start: float, height: float, slope: float, normalization: float, ) -> np.ndarray | float: """Produce samples from the local age distribution. Parameters ---------- uniform_rv Values pulled from a uniform distribution and belonging to either the left or right half of the local distribution. The halves are determined by the point Z in [0, 1] such that Q(Z) = the midpoint of the age bin in question, where Q is inverse of the local cumulative distribution function. start Either the left edge of the age bin (if we're in the left half of the distribution) or the midpoint of the age bin (if we're in the right half of the distribution). height The value of the local distribution at `start` slope The slope of the local distribution. normalization The total area under the distribution. Returns ------- Smoothed ages from one half of the age bin distribution. """ if abs(slope) < np.finfo(np.float32).eps: return start + normalization / height * uniform_rv else: return start + height / slope * ( np.sqrt(1 + 2 * normalization * slope / height**2 * uniform_rv) - 1 )
[docs] def get_cause_deleted_mortality_rate( all_cause_mortality_rate: pd.DataFrame, list_of_csmrs: list[pd.DataFrame | None], ) -> pd.DataFrame: """Compute the cause-deleted mortality rate by subtracting individual CSMRs. Parameters ---------- all_cause_mortality_rate DataFrame with standard age/sex/year index columns and a ``value`` column. list_of_csmrs List of DataFrames each containing a ``value`` column representing a cause-specific mortality rate. ``None`` entries are skipped. Returns ------- DataFrame with the same index columns and a ``death_due_to_other_causes`` column containing the residual mortality rate after subtracting all provided cause-specific rates. """ index_cols = ["age_start", "age_end", "sex", "year_start", "year_end"] all_cause_mortality_rate = all_cause_mortality_rate.set_index(index_cols).copy() for csmr in list_of_csmrs: if csmr is None: continue all_cause_mortality_rate = all_cause_mortality_rate.subtract( csmr.set_index(index_cols) ).dropna() return all_cause_mortality_rate.reset_index().rename( columns={"value": "death_due_to_other_causes"} )
[docs] def load_population_structure(builder: Builder) -> pd.DataFrame: """Load population structure data from the artifact and add derived columns. Parameters ---------- builder Access point for utilizing framework interfaces during setup. Returns ------- DataFrame with all columns from the raw data plus ``age`` and ``location``. """ data = builder.data.load("population.structure") # create an age column which is the midpoint of the age group data["age"] = data.apply(lambda row: (row["age_start"] + row["age_end"]) / 2, axis=1) data["location"] = builder.data.load("population.location") return data
[docs] def get_live_births_per_year(builder: Builder) -> pd.Series: """Compute the simulated number of live births per year. Combines population structure data with live birth covariate data to produce a per-year birth rate scaled to the simulation's initial population size. Handles time-dependent vs. fixed birth rates and population fractions according to the fertility configuration, and extends the series to cover the simulation end year if needed. Parameters ---------- builder Access point for utilizing framework interfaces during setup. Returns ------- A :class:`pandas.Series` indexed by year containing the expected number of new simulant births per year. """ population_data = load_population_structure(builder) birth_data = builder.data.load("covariate.live_births_by_sex.estimate") validate_crude_birth_rate_data(builder, population_data.year_end.max()) population_data = rescale_final_age_bin(builder, population_data) initial_population_size = builder.configuration.population.population_size population_data = population_data.groupby(["year_start"])["value"].sum() birth_data = ( birth_data[birth_data.parameter == "mean_value"] .drop(columns=["parameter"]) .groupby(["year_start"])["value"] .sum() ) start_year = builder.configuration.time.start.year if ( builder.configuration.interpolation.extrapolate and start_year > birth_data.index.max() ): start_year = birth_data.index.max() if not builder.configuration.fertility.time_dependent_live_births: birth_data = birth_data.at[start_year] if not builder.configuration.fertility.time_dependent_population_fraction: population_data = population_data.at[start_year] live_birth_rate = initial_population_size / population_data * birth_data if isinstance(live_birth_rate, (int, float)): live_birth_rate = pd.Series( live_birth_rate, index=pd.RangeIndex( builder.configuration.time.start.year, builder.configuration.time.end.year + 1, name="year", ), ) else: live_birth_rate = ( live_birth_rate.reset_index() .rename(columns={"year_start": "year"}) .set_index("year") .value ) exceeds_data = builder.configuration.time.end.year > live_birth_rate.index.max() if exceeds_data: new_index = pd.RangeIndex( live_birth_rate.index.min(), builder.configuration.time.end.year + 1 ) live_birth_rate = live_birth_rate.reindex( new_index, fill_value=live_birth_rate.at[live_birth_rate.index.max()] ) return live_birth_rate
[docs] def rescale_final_age_bin(builder: Builder, population_data: pd.DataFrame) -> pd.DataFrame: """Clip and rescale the final age bin to match ``initialization_age_max``. When ``population.initialization_age_max`` is configured and falls within an existing age bin, that bin is truncated at ``initialization_age_max`` and its ``value`` is scaled proportionally to reflect the reduced width. Parameters ---------- builder Access point for utilizing framework interfaces during setup. population_data DataFrame with columns ``age_start``, ``age_end``, and ``value``. Returns ------- A copy of ``population_data`` with the final age bin adjusted to end at ``initialization_age_max`` and its value rescaled accordingly. Returned unchanged if ``initialization_age_max`` is not set. """ initialization_age_max = builder.configuration.population.to_dict().get( "initialization_age_max", None ) if initialization_age_max: population_data = population_data.loc[ population_data["age_start"] < initialization_age_max ].copy() cut_bin_idx = initialization_age_max <= population_data["age_end"] cut_age_start = population_data.loc[cut_bin_idx, "age_start"] cut_age_end = population_data.loc[cut_bin_idx, "age_end"] population_data.loc[cut_bin_idx, "value"] *= ( initialization_age_max - cut_age_start ) / (cut_age_end - cut_age_start) population_data.loc[cut_bin_idx, "age_end"] = initialization_age_max return population_data
[docs] def validate_crude_birth_rate_data(builder: Builder, data_year_max: int) -> None: """Validate that birth rate data covers the simulation time period. Parameters ---------- builder Access point for utilizing framework interfaces during setup. data_year_max The latest year covered by the available birth rate data. Raises ------ ValueError If the simulation end year exceeds ``data_year_max`` and extrapolation is not enabled. """ exceeds_data = builder.configuration.time.end.year > data_year_max if exceeds_data and not builder.configuration.interpolation.extrapolate: raise ValueError("Trying to extrapolate beyond the end of available birth data.")