Source code for pandas_genomics.accessors.dataframe_accessor

from collections import Counter
from typing import Optional, List, Union

import numpy as np
import pandas as pd

from .utils import calculate_edge_alphas
from pandas_genomics.arrays import GenotypeDtype
from ..scalars import Region


[docs]@pd.api.extensions.register_dataframe_accessor("genomics") class GenotypeDataframeAccessor: """ DataFrame accessor for GenotypeArray methods .. code-block:: python df.genomics.variant_info df.genomics.encode_additive() """
[docs] def __init__(self, pandas_obj): if not pandas_obj.dtypes.apply(lambda dt: GenotypeDtype.is_dtype(dt)).any(): raise AttributeError( "Incompatible datatypes: at least one column must be a GenotypeDtype." ) id_counts = Counter( [ s.genomics.variant.id for _, s in pandas_obj.iteritems() if GenotypeDtype.is_dtype(s) ] ) if len(id_counts) < len(pandas_obj.select_dtypes([GenotypeDtype]).columns): duplicates = [(k, v) for k, v in id_counts.items() if v >= 2] raise AttributeError( f"Duplicate Variant IDs. Column names may differ from variant IDs, but variant IDs must be unique.\n\tDuplicates: " + ", ".join([f"{dupe} ({count:,})" for dupe, count in duplicates]) ) self._obj = pandas_obj
###################### # Variant Properties # ###################### # These methods generally only return a result for each GenotypeArray column, ignoring other columns @property def variant_info(self) -> pd.DataFrame: """Return a DataFrame with variant info indexed by the column name (one row per GenotypeArray)""" genotypes = self._obj.select_dtypes([GenotypeDtype]) return pd.DataFrame.from_dict( { colname: series.genomics.variant_info for colname, series in genotypes.iteritems() }, orient="index", ) @property def maf(self): """Return the minor allele frequency of each variant See :py:attr:`GenotypeArray.maf`""" genotypes = self._obj.select_dtypes([GenotypeDtype]) return genotypes.apply(lambda col: col.genomics.maf) @property def hwe_pval(self): """Return the probability that the samples are in HWE See :py:attr:`GenotypeArray.hwe_pval`""" genotypes = self._obj.select_dtypes([GenotypeDtype]) return genotypes.apply(lambda col: col.genomics.hwe_pval) ############ # Encoding # ############ # These methods generally return encoded values for any GenotypeArray columns without modifying other columns def encode_additive(self) -> pd.DataFrame: """Additive encoding of genotypes. See :meth:`GenotypeArray.encode_additive` Returns ------- pd.DataFrame """ return pd.concat( [ s.genomics.encode_additive() if GenotypeDtype.is_dtype(s) else s for _, s in self._obj.iteritems() ], axis=1, ) def encode_dominant(self) -> pd.DataFrame: """Dominant encoding of genotypes. See :meth:`GenotypeArray.encode_dominant` Returns ------- pd.DataFrame """ return pd.concat( [ s.genomics.encode_dominant() if GenotypeDtype.is_dtype(s) else s for _, s in self._obj.iteritems() ], axis=1, ) def encode_recessive(self) -> pd.DataFrame: """Recessive encoding of genotypes. See :meth:`GenotypeArray.encode_recessive` Returns ------- pd.DataFrame """ return pd.concat( [ s.genomics.encode_recessive() if GenotypeDtype.is_dtype(s) else s for _, s in self._obj.iteritems() ], axis=1, ) def encode_codominant(self) -> pd.DataFrame: """Codominant encoding of genotypes. See :meth:`GenotypeArray.encode_codominant` Returns ------- pd.DataFrame """ return pd.concat( [ s.genomics.encode_codominant() if GenotypeDtype.is_dtype(s) else s for _, s in self._obj.iteritems() ], axis=1, ) def encode_edge(self, encoding_info: pd.DataFrame) -> pd.DataFrame: """EDGE (weighted) encoding of genotypes. See :meth:`GenotypeArray.encode_edge` Parameters ---------- encoding_info: pd.DataFrame columns: - Variant ID - used to match variants - Alpha Value - used for heterozygous genotypes - Ref Allele - which allele is considered reference - Alt Allele - which allele is considered alternate - Minor Allele Frequency - MAF of data used during calculation of alpha values Returns ------- pd.DataFrame """ # Validate the input DataFrame for required_col in [ "Variant ID", "Alpha Value", "Ref Allele", "Alt Allele", "Minor Allele Frequency", ]: if required_col not in list(encoding_info): raise ValueError( f"Missing one or more required columns in the encoding info: `{required_col}`" ) id_counts = encoding_info["Variant ID"].value_counts() if sum(id_counts > 1): raise ValueError( f"Duplicate IDs: {', '.join([v for v in id_counts[id_counts>1].index])}" ) # Rename the columns to match parameter names for simplicity encoding_info = encoding_info.rename( columns={ "Alpha Value": "alpha_value", "Ref Allele": "ref_allele", "Alt Allele": "alt_allele", "Minor Allele Frequency": "minor_allele_freq", } ) # Convert the encoding info into a Dict("Variant ID" = {param names : param values}) encoding_info = { d["Variant ID"]: {k: v for k, v in d.items() if k != "Variant ID"} for d in encoding_info.to_dict(orient="records") } # Log messages for any warnings warnings = dict() # Process each variant results = [] for _, s in self._obj.iteritems(): if not GenotypeDtype.is_dtype(s): results.append(s) continue info = encoding_info.get(s.array.variant.id, None) if info is None: warnings[ s.array.variant.id ] = "No matching information found in the encoding data" continue elif (s.genomics.maf / info["minor_allele_freq"]) > 10e30: # TODO: replace this with a reasonable comparison to the data MAF. For now it is an always-pass criteria warnings[ s.array.variant.id ] = f"Large MAF Difference: {s.genomics.maf} in sample, {info['minor_allele_freq']} in encoding data" continue else: try: results.append(s.genomics.encode_edge(**info)) except Exception as e: warnings[s.array.variant.id] = str(e) # Print Warnings if len(warnings) > 0: print(f"{len(warnings):,} Variables failed encoding") for var, warning in warnings.items(): print(f"\t{var}: {warning}") # Concatenate results return pd.concat(results, axis=1) def calculate_edge_encoding_values( self, data: pd.DataFrame, outcome_variable: str, covariates: Optional[List[str]] = None, ): """ Calculate alpha values to be used in edge encoding Parameters ---------- data: Data to be used in the regression, including the outcome and covariates outcome_variable: The variable to be used as the output (y) of the regression covariates: Other variables to be included in the regression formula Returns ------- Dict Variant ID: str Alpha Value - used for heterozygous genotypes Ref Allele - which allele is considered reference Alt Allele - which allele is considered alternate Minor Allele Frequency - MAF of data used during calculation of alpha values Notes ----- See [1]_ for more information about edge encoding. References ---------- .. [1] Hall, Molly A., et al. "Novel EDGE encoding method enhances ability to identify genetic interactions." PLoS genetics 17.6 (2021): e1009534. """ return calculate_edge_alphas( genotypes=self._obj.select_dtypes([GenotypeDtype]), data=data, outcome_variable=outcome_variable, covariates=covariates, ) ########### # Filters # ########### # These methods drop genotypes that fail the filter, ignoring other columns def filter_variants_maf(self, keep_min_freq: float = 0.01) -> pd.DataFrame: """ Drop variants with a MAF less than the specified value (0.01 by default) """ genotypes = self._obj.select_dtypes([GenotypeDtype]) removed = genotypes.loc[:, genotypes.genomics.maf < keep_min_freq].columns return self._obj.drop(columns=removed) def filter_variants_hwe(self, cutoff: float = 0.05) -> pd.DataFrame: """ Drop variants with a probability of HWE less than the specified value (0.05 by default). Keep np.nan results, which occur for non-diploid variants and insufficient sample sizes """ genotypes = self._obj.select_dtypes([GenotypeDtype]) genotype_hwe_pval = genotypes.genomics.hwe_pval removed = genotypes.loc[ :, (genotype_hwe_pval < cutoff) & ~np.isnan(genotype_hwe_pval) ].columns return self._obj.drop(columns=removed) def in_regions(self, regions: Union[Region, List[Region]]): """ Keep variants that exist in one or more of the specified regions Parameters ---------- regions: Region or List[Region] Returns ------- pd.DataFrame """ if isinstance(regions, Region): regions = [ regions, ] genotypes = self._obj.select_dtypes([GenotypeDtype]) boolean_array = genotypes.apply(lambda s: False) for r in regions: boolean_array = boolean_array | genotypes.apply( lambda s: s.genomics.contained_by(r) ) removed = boolean_array[~boolean_array].index # Remove where False return self._obj.drop(columns=removed) def not_in_regions(self, regions: Union[Region, List[Region]]): """ Remove variants that exist in one or more of the specified regions Parameters ---------- regions: Region or List[Region] Returns ------- pd.DataFrame """ if isinstance(regions, Region): regions = [ regions, ] genotypes = self._obj.select_dtypes([GenotypeDtype]) boolean_array = genotypes.apply(lambda s: False) for r in regions: boolean_array = boolean_array | genotypes.apply( lambda s: s.genomics.contained_by(r) ) removed = boolean_array[boolean_array].index # Remove where True return self._obj.drop(columns=removed)