Source code for pandas_genomics.sim.biallelic_model_simulator

from enum import Enum
from typing import Optional, Tuple, Union

import numpy as np
import pandas as pd
import patsy
import statsmodels.api as sm
from numpy.random._generator import default_rng

from pandas_genomics.arrays import GenotypeArray, GenotypeDtype
from pandas_genomics.scalars import Variant, MISSING_IDX

[docs]class SNPEffectEncodings(Enum): """Enum: Normalized SNP Effects encoded as 3-length tuples""" DOMINANT = (0, 1, 1) SUPER_ADDITIVE = (0, 0.75, 1) ADDITIVE = (0, 0.5, 1) SUB_ADDITIVE = (0, 0.25, 1) RECESSIVE = (0, 0, 1) HET = (0, 1, 0)
[docs]class PenetranceTables(Enum): """Enum: Penetrance Tables for Simple Models""" HR_HR = [1, 0, 0, 0, 0, 0, 0, 0, 0] # Homozygous Referent X Homozygous Referent HR_HET = [0, 1, 0, 0, 0, 0, 0, 0, 0] # Homozygous Referent X Heterozygous HR_HA = [0, 0, 1, 0, 0, 0, 0, 0, 0] # Homozygous Referent X Homozygous Alternate HET_HET = [0, 0, 0, 0, 1, 0, 0, 0, 0] # Heterozygous X Heterozygous HET_HA = [0, 0, 0, 0, 0, 1, 0, 0, 0] # Heterozygous X Homozygous Alternate XOR = [1, 0, 1, 0, 1, 0, 1, 0, 1] # XOR Model HYP = [0, 0.5, 1, 0.5, 0.5, 0.5, 1, 0.5, 0] # Hyperbolic Model RHYP = [1, 0.5, 0, 0.5, 0.5, 0.5, 0, 0.5, 1] # Hyperbolic Model NULL = [0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5] # Null Model (Always 50/50)
[docs]class BAMS: """ Biallelic Model Simulator. Used to simulate two SNPs with phenotype data based on a penetrance table. It can be initialized using the PenetranceTables enum or using `from_model` with values from the SNPEffectEncodings enum. """
[docs] def __init__( self, pen_table: Union[np.array, PenetranceTables] = np.array( [[0.0, 0.0, 1.0], [0.0, 0.0, 1.0], [1.0, 1.0, 2.0]] ), penetrance_base: float = 0.25, penetrance_diff: Optional[float] = None, snp1: Optional[Variant] = None, snp2: Optional[Variant] = None, random_seed: int = 1855, ): """ Parameters ---------- pen_table: 3x3 np array or PenetranceTables enum Penetrance values. Will be scaled between 0 and 1 if needed. penetrance_base: float, default 0.25 Baseline to use in the final penetrance table, must be in [0,1] penetrance_diff: optional float, default None (use 1-2*penetrance_base) Difference between minimum and maximimum probabilities in the penetrance table. penetrance_base + penetrance_diff must be in [0,1] snp1: Optional[Variant] snp2: Optional[Variant] random_seed: int, default 1855 """ pen_table, snp1, snp2 = self._validate_params( pen_table, penetrance_base, penetrance_diff, snp1, snp2 ) self.pen_table = pen_table self.snp1 = snp1 self.snp2 = snp2 self._random_seed = random_seed self.rng = default_rng(self._random_seed)
def __str__(self): pen_table_df = pd.DataFrame(self.pen_table) pen_table_df.columns = self._get_genotype_strs(self.snp1) pen_table_df.index = self._get_genotype_strs(self.snp2) return ( f"SNP1 = {str(self.snp1)}\n" f"SNP2 = {str(self.snp2)}\n" f"Penetrance Table:\n" f"----------------\n" f"{pen_table_df}\n" f"----------------\n" f"Random Seed = {self._random_seed}" ) def __eq__(self, other): if type(other) is not type(self): raise NotImplemented else: return ( (self.pen_table == other.pen_table).all() & (self.snp1 == other.snp1) & (self.snp2 == other.snp2) & (self._random_seed == other._random_seed) ) @property def random_seed(self) -> int: return self._random_seed def set_random_seed(self, new_seed: int): """ Reset the random number generator with the specified seed. """ self._random_seed = new_seed self.rng = default_rng(self._random_seed) @staticmethod def _validate_params(pen_table, penetrance_base, penetrance_diff, snp1, snp2): """Validate parameters and calculate final penetrance table""" # Process Enum if type(pen_table) is PenetranceTables: pen_table = np.array(pen_table.value).reshape((3, 3)) elif isinstance(pen_table, np.ndarray): if pen_table.shape != (3, 3): raise ValueError(f"Incorrect shape for pen_table, must be 3x3") else: raise ValueError( f"pen_table must be a 3x3 numpy array or PenetranceTables enum, not {type(pen_table)}" ) if (pen_table < 0).any(): raise ValueError(f"Penetrance table values cannot be negative.") # Scale penetrance table if needed if (pen_table.min() != 0) or (pen_table.max() != 1): pen_table_min = pen_table.min() pen_table_range = pen_table.max() - pen_table_min if pen_table_range > 0: pen_table = (pen_table - pen_table_min) / pen_table_range # Otherwise the penetrance table is flat, i.e. a null model # Process base and diff if (penetrance_base < 0) or (penetrance_base > 1): raise ValueError( f"penetrance_base must be in [0,1], {penetrance_base} was outside this range" ) if penetrance_diff is None: penetrance_diff = 1 - (2 * penetrance_base) elif penetrance_diff < 0: raise ValueError("penetrance_diff must be > 0") elif (penetrance_diff + penetrance_base) > 1: raise ValueError(f"penetrance_base + penetrance_diff must be <= 1") # SNPs if snp1 is None: snp1 = Variant(id="rs1", ref="A", alt=["a"]) if snp2 is None: snp2 = Variant(id="rs2", ref="B", alt=["b"]) if len(snp1.alt) != 1: raise ValueError(f"SNP1 is not Bialleleic: {snp1}") if len(snp2.alt) != 1: raise ValueError(f"SNP2 is not Bialleleic: {snp2}") # Create final pen_table pen_table = penetrance_base + penetrance_diff * pen_table return pen_table, snp1, snp2 @classmethod def from_model( cls, eff1: Union[ Tuple[float, float, float], SNPEffectEncodings ] = SNPEffectEncodings.RECESSIVE, eff2: Union[ Tuple[float, float, float], SNPEffectEncodings ] = SNPEffectEncodings.RECESSIVE, penetrance_base: float = 0.25, penetrance_diff: Optional[float] = None, main1: float = 1.0, main2: float = 1.0, interaction: float = 0.0, snp1: Optional[Variant] = None, snp2: Optional[Variant] = None, random_seed: int = 1855, ): """ Create a BiallelicSimulation with a Penetrance Table based on a fully specified model y = β0 + β1(eff1) + β2(eff2) + β3(eff1*eff2) Parameters ---------- eff1: tuple of 3 floats Normalized effect of SNP1 May be passed a value from the `Effects` enum eff2: tuple of 3 floats Normalized effect of SNP2 May be passed a value from the `Effects` enum penetrance_base: float, default 0.25 β0 in the formula penetrance_diff: Optional[float] = None Difference between min and max probabilities in the penetrance table, 1-(2*baseline) if not specified main1: float, default 1.0 Main effect of SNP1, β1 in the formula main2: float, default 1.0 Main effect of SNP2, β2 in the formula interaction: float, default 0.0 Interaction effect, β3 in the formula snp1: Variant, default is None First SNP, one will be created by default if not specified snp2: Variant, default is None Second SNP, one will be created by default if not specified random_seed: int, default is 1855 Random seed used during simulation Returns ------- BAMS """ # TODO: Add more validation if type(eff1) is SNPEffectEncodings: eff1 = eff1.value if type(eff2) is SNPEffectEncodings: eff2 = eff2.value # Shape effects and scale if needed eff1 = np.array([eff1]) # SNP1 = columns eff2 = np.array([eff2]).transpose() # SNP2 = rows if eff1.min() != 0 or eff1.max() != 1: print("Scaling eff1") eff1 = (eff1 - eff1.min()) / (eff1.max() - eff1.min()) if eff2.min() != 0 or eff2.max() != 1: print("Scaling eff2") eff2 = (eff2 - eff2.min()) / (eff2.max() - eff2.min()) pen_table = ( +main1 * np.repeat(eff1, 3, axis=0) + main2 * np.repeat(eff2, 3, axis=1) + interaction * np.outer(eff2, eff1) ) return cls(pen_table, penetrance_base, penetrance_diff, snp1, snp2, random_seed) def generate_case_control( self, n_cases: int = 1000, n_controls: int = 1000, maf1: float = 0.30, maf2: float = 0.30, snr: Optional[float] = None, ): """ Simulate genotypes with the specified number of 'case' and 'control' phenotypes Parameters ---------- n_cases: int, default 1000 n_controls: int, default 1000 maf1: float, default 0.30 Minor Allele Frequency to use for SNP1 maf2: float, default 0.30 Minor Allele Frequency to use for SNP2 snr: float, default 1.0 Signal-to-noise ratio Returns ------- pd.Dataframe Dataframe with 3 columns: Outcome (categorical), SNP1 (GenotypeArray), and SNP2 (GenotypeArray) """ # Validate params if n_cases < 1 or n_controls < 0: raise ValueError( "Simulation must include at least one case and at least one control" ) # Create table of Prob(GT) based on MAF, assuming HWE prob_snp1 = np.array([(1 - maf1) ** 2, 2 * maf1 * (1 - maf1), (maf1) ** 2]) prob_snp2 = np.array( [(1 - maf2) ** 2, 2 * maf2 * (1 - maf2), (maf2) ** 2] ).transpose() prob_gt = np.outer(prob_snp2, prob_snp1) pen_table = self.pen_table if snr is not None: # Scale penetrance table from 0 to 1 pen_table = (pen_table - pen_table.min()) / ( pen_table.max() - pen_table.min() ) # Calcualte sigma sigma = self._calculate_sigma(pen_table, prob_gt) # Scale the penetrance by the amount of unexplained variance # Odds Ratio = exp(SNP/Sigma) min_p = 1 / (1 + np.exp(1 / sigma * snr)) p_diff = 1 - 2 * min_p # Adjust the penetrance pen_table = min_p + pen_table * p_diff # P(Case|GT) * P(GT) # Bayes: P(GT|Case) = ------------------ # P(Case) # Prob(Case|GT) = pen_table # Prob(Case) = sum(Prob(Case|GTi) * Prob(GTi) for each GT i) prob_case = (pen_table * prob_gt).sum() # Prob(GT|Case) prob_gt_given_case = (pen_table * prob_gt) / prob_case # Prob(Control|GT) = 1-pen_table # Prob(Control) = sum(Prob(Control|GTi) * Prob(GTi) for each GT i) prob_control = ((1 - pen_table) * prob_gt).sum() # Prob(GT|Control) prob_gt_given_control = ((1 - pen_table) * prob_gt) / prob_control # Generate genotypes based on the simulated cases and controls # Pick int index into the table (0 through 8) counted left to right then top to bottom (due to flatten()) case_gt_table_idxs = self.rng.choice( range(9), size=n_cases, p=prob_gt_given_case.flatten() ) control_gt_table_idxs = self.rng.choice( range(9), size=n_controls, p=prob_gt_given_control.flatten() ) # Create GenotypeArrays snp1_case_array = pd.Series(self._get_snp1_gt_array(case_gt_table_idxs)) snp2_case_array = pd.Series(self._get_snp2_gt_array(case_gt_table_idxs)) snp1_control_array = pd.Series(self._get_snp1_gt_array(control_gt_table_idxs)) snp2_control_array = pd.Series(self._get_snp2_gt_array(control_gt_table_idxs)) # Merge data together snp1 = pd.concat([snp1_case_array, snp1_control_array]).reset_index(drop=True) snp2 = pd.concat([snp2_case_array, snp2_control_array]).reset_index(drop=True) # Generate outcome outcome = pd.Series(["Case"] * n_cases + ["Control"] * n_controls).astype( "category" ) result = pd.concat([outcome, snp1, snp2], axis=1) result.columns = ["Outcome", "SNP1", "SNP2"] # Scramble outcome so cases and controls are mixed result = result.sample(frac=1, random_state=self._random_seed).reset_index( drop=True ) return result def generate_quantitative( self, n_samples: int = 1000, maf1: float = 0.30, maf2: float = 0.30, snr: Optional[float] = None, ): """ Simulate genotypes with a quantitative outcome (mean = probability based on genotypes, sd = 1) Parameters ---------- n_samples: int, default 1000 maf1: float, default 0.30 Minor Allele Frequency to use for SNP1 maf2: float, default 0.30 Minor Allele Frequency to use for SNP2 snr: float, default 1.0 Signal-to-noise ratio Returns ------- pd.Dataframe Dataframe with 3 columns: Outcome, SNP1 (GenotypeArray), and SNP2 (GenotypeArray) """ pen_table = self.pen_table # Create table of Prob(GT) based on MAF, assuming HWE prob_snp1 = np.array([(1 - maf1) ** 2, 2 * maf1 * (1 - maf1), (maf1) ** 2]) prob_snp2 = np.array( [(1 - maf2) ** 2, 2 * maf2 * (1 - maf2), (maf2) ** 2] ).transpose() prob_gt = np.outer(prob_snp2, prob_snp1) if snr is not None: # Scale penetrance table from 0 to 1 pen_table = (pen_table - pen_table.min()) / ( pen_table.max() - pen_table.min() ) # Calculate sigma sigma = self._calculate_sigma(pen_table, prob_gt) # Scale the penetrance by the amount of unexplained variance pen_table = (pen_table / sigma) * snr # Generate genotypes gt_table_idxs = self.rng.choice(range(9), size=n_samples, p=prob_gt.flatten()) snp1_array = pd.Series(self._get_snp1_gt_array(gt_table_idxs)) snp2_array = pd.Series(self._get_snp2_gt_array(gt_table_idxs)) # Calculate outcome: Normal distribution with mean=probability and sd = 1 outcome = pd.Series( self.rng.normal(loc=np.take(pen_table.flatten(), gt_table_idxs)) ) result = pd.concat([outcome, snp1_array, snp2_array], axis=1) result.columns = ["Outcome", "SNP1", "SNP2"] return result @staticmethod def _calculate_sigma(pen_table, prob_gt): # Set up a dataframe with the penetrance table outcome for each SNP combination (Encoded as codominant) cats = ["Ref", "Het", "Hom"] d = pd.DataFrame( { "SNP1": pd.Categorical(cats * 3, categories=cats, ordered=True), "SNP2": pd.Categorical( [i for c in cats for i in [c] * 3], categories=cats, ordered=True, ), }, dtype="category", ) d["y"] = pen_table.flatten() # Get the weight vec as variance based on genotype probability wt_vec = (prob_gt * (1 - prob_gt)).flatten() # Regress the possible combinations of genotypes against the penetrance table, using the genotype frequency variance as weights y, X = patsy.dmatrices("y ~ 1 + SNP1 + SNP2", data=d) mod = sm.WLS(y, X, weights=wt_vec) res = # If rsquared is 1, there is no interaction, so use constant endogenous values if res.rsquared == 1: mod = sm.WLS(y, exog=np.ones(len(y)), weights=wt_vec) res = sigma = np.sqrt(res.scale) # sigma is sqrt of the dispersion (aka scale) return sigma @staticmethod def _get_genotype_strs(variant): """Return a list of homozygous-ref, het, and homozygous-alt""" return [ f"{variant.ref}{variant.ref}", f"{variant.ref}{variant.alt[0]}", f"{variant.alt[0]}{variant.alt[0]}", ] def _get_snp1_gt_array(self, gt_table_idxs): """Assemble a GenotypeArray for SNP1 directly from genotype table indices""" dtype = GenotypeDtype(self.snp1) gt_table_data = ( ((0, 0), MISSING_IDX), ((0, 1), MISSING_IDX), ((1, 1), MISSING_IDX), ((0, 0), MISSING_IDX), ((0, 1), MISSING_IDX), ((1, 1), MISSING_IDX), ((0, 0), MISSING_IDX), ((0, 1), MISSING_IDX), ((1, 1), MISSING_IDX), ) data = np.array( [gt_table_data[i] for i in gt_table_idxs], dtype=dtype._record_type ) return GenotypeArray(values=data, dtype=dtype) def _get_snp2_gt_array(self, gt_table_idxs): """Assemble a GenotypeArray for SNP2 directly from genotype table indices""" dtype = GenotypeDtype(self.snp2) gt_table_data = ( ((0, 0), MISSING_IDX), ((0, 0), MISSING_IDX), ((0, 0), MISSING_IDX), ((0, 1), MISSING_IDX), ((0, 1), MISSING_IDX), ((0, 1), MISSING_IDX), ((1, 1), MISSING_IDX), ((1, 1), MISSING_IDX), ((1, 1), MISSING_IDX), ) data = np.array( [gt_table_data[i] for i in gt_table_idxs], dtype=dtype._record_type ) return GenotypeArray(values=data, dtype=dtype)