import operator
import re
from copy import copy
from typing import Dict, MutableMapping, Any, Optional, List, Union, Tuple, Iterable
import numpy as np
import pandas as pd
from pandas.core.arrays import ExtensionArray, BooleanArray, IntegerArray
from pandas.core.dtypes.dtypes import register_extension_dtype, PandasExtensionDtype
from pandas.core.dtypes.inference import is_list_like
from pandas_genomics.arrays.encoding_mixin import EncodingMixin
from pandas_genomics.arrays.info_mixin import InfoMixin
from pandas_genomics.scalars import Variant, Genotype, MISSING_IDX
[docs]@register_extension_dtype
class GenotypeDtype(PandasExtensionDtype):
"""
An ExtensionDtype for genotype data.
Parameters
----------
variant: Variant or None
The ~Variant associated with the genotype.
If None, use an anonymous variant
Attributes
----------
variant: Variant
The variant that the datatype is specific to
Examples
--------
v = Variant(chromosome='12', position=112161652, id='rs12462', ref='T', alt=['C',], score=25)
>>> GenotypeDtype(v)
genotype(2n)[12; 112161652; rs12462; T; C]Q25
"""
# Internal attributes
# -------------------
# metadata field names
_metadata = ("variant",)
# Regular expression
# TODO: Validate ref/alt more specifically
_match = re.compile(
r"genotype\((?P<ploidy>[0-9]+)n\)\["
r"(?P<chromosome>.+); "
r"(?P<position>[0-9]+); "
r"(?P<id>.+); "
r"(?P<ref>.+); "
r"(?P<alt>.+)\]"
r"(Q(?P<score>[0-9]+))?$"
)
kind = "O"
type = Genotype
# ExtensionDtype Properties
# -------------------------
@property
def na_value(self) -> Genotype:
"""
Return the genotype with variant information but no alleles specified
"""
return Genotype(variant=self.variant)
@property
def name(self) -> str:
return str(self)
# init
# ----
[docs] def __init__(self, variant: Optional[Variant] = None):
# Set variant
if variant is None:
variant = Variant()
self.variant = variant
# Data backing the GenotypeArray is stored as a numpy structured array
# An unsigned integer for each allele in the genotype indexing the list of possible alleles
# An unsigned integer for the genotype score (255 if missing)
self._record_type = np.dtype(
[
("allele_idxs", np.uint8, (self.variant.ploidy,)),
("gt_score", np.uint8),
]
)
self.itemsize = self._record_type.itemsize
# ExtensionDtype Methods
# -------------------------
@classmethod
def construct_array_type(cls) -> type:
"""
Return the array type associated with this dtype
Returns
-------
type
"""
return GenotypeArray
@classmethod
def construct_from_string(cls, string):
"""
Construct a GenotypeDtype from a string.
Parameters
----------
string : str
The string alias for this GenotypeDtype.
Should be formatted like `genotype(<ploidy>n)[<chromosome>; <position>; <id>; <ref>; <alt>]`
Examples
--------
>>> GenotypeDtype.construct_from_string('genotype(2n)[chr1; 123456; rs12345; A; T,G]')
genotype(2n)[chr1; 123456; rs12345; A; T,G]
"""
if isinstance(string, str):
msg = "Cannot construct a 'GenotypeDtype' from '{}'"
try:
match = cls._match.match(string)
if match is not None:
d = match.groupdict()
# Score is optional, so it may be None
score = d["score"]
if score is not None:
score = int(score)
variant = Variant(
chromosome=d["chromosome"],
position=int(d["position"]),
id=d["id"],
ref=d["ref"],
alt=d["alt"].split(","),
ploidy=int(d["ploidy"]),
score=score,
)
return cls(variant=variant)
else:
raise TypeError(msg.format(string))
except Exception:
raise TypeError(msg.format(string))
else:
raise TypeError(
f"'construct_from_string' expects a string, got {type(string)}>"
)
@classmethod
def from_genotype(cls, genotype: Genotype):
"""
Construct a GenotypeDtype from a Genotype.
Parameters
----------
genotype: Genotype
Examples
-------
>>> variant = Variant('12', 112161652, 'rs12462')
>>> genotype = variant.make_genotype_from_str('C/T', add_alleles=True)
>>> GenotypeDtype.from_genotype(genotype)
genotype(2n)[12; 112161652; rs12462; ref=N; alt=T,C]
"""
return cls(genotype.variant)
@classmethod
def is_dtype(cls, dtype) -> bool:
"""
Return a boolean if the passed type can be processed as this dtype
"""
if isinstance(dtype, cls):
return True
elif isinstance(dtype, str):
if dtype.lower().startswith("genotype("):
try:
if cls.construct_from_string(dtype) is not None:
return True
else:
return False
except (ValueError, TypeError):
return False
else:
return False
return super().is_dtype(dtype)
# Other methods
# -------------
def __str__(self):
variant_score_str = ""
if self.variant.score is not None:
variant_score_str = f"Q{self.variant.score}"
return (
f"genotype({self.variant.ploidy}n)["
f"{self.variant.chromosome}; "
f"{self.variant.position}; "
f"{self.variant.id}; "
f"{self.variant.ref}; "
f"{self.variant.alt}]" + variant_score_str
)
def __hash__(self):
return hash(str(self))
def __copy__(self):
"""Create a copy to avoid references to the same variant"""
return GenotypeDtype(copy(self.variant))
def __eq__(self, other):
if isinstance(other, str):
return other == self.name
return isinstance(other, GenotypeDtype) and self.variant == other.variant
# Pickle compatibility
# --------------------
def __getstate__(self) -> Dict[str, Any]:
# pickle support; we don't want to pickle the cache
return {k: getattr(self, k, None) for k in self._metadata}
def __setstate__(self, state: MutableMapping[str, Any]) -> None:
self.variant = state.pop("variant")
# Other internal methods
# ----------------------
def unpack_genotype(self, genotype: Optional[Genotype]):
if genotype is None:
return self.unpack_genotype(self.na_value)
elif not isinstance(genotype, Genotype):
raise ValueError(
f"The fill value must be None or a Genotype object, not {type(genotype)}"
)
else:
score = float("NaN")
if genotype.score is not None:
score = genotype.score
return np.array(
list(genotype.allele_idxs) + [score], dtype=self._record_type
)
[docs]class GenotypeArray(ExtensionArray, EncodingMixin, InfoMixin):
"""
Holder for genotypes
Variant information is stored as part of the type, and the genotype is stored as a pair of integer arrays
Parameters
----------
values : list-like
The values of the genotypes.
dtype : GenotypeDtype
The specific parametized type. Optional (if possible to infer from `values`)
Attributes
----------
dtype: GenotypeDtype
The specific parametized type
data: np.dtype("u8") with shape (<genotypes>, <ploidy>)
The genotype values encoded as indices into the allele list of the dtype
"""
# array priority higher than numpy scalars
__array_priority__ = 1000
[docs] def __init__(
self,
values: Union[List[Genotype], "GenotypeArray", np.ndarray],
dtype: Optional[GenotypeDtype] = None,
copy: bool = False,
):
"""Initialize assuming values is a GenotypeArray or a numpy array with the correct underlying shape"""
# If the dtype is passed, ensure it is the correct type
if GenotypeDtype.is_dtype(dtype):
self._dtype = dtype
elif dtype is None:
self._dtype = None
else:
raise ValueError(f"The passed dtype '{dtype}' is not a GenotypeDtype")
# Load the values
# ---------------
if not is_list_like(values):
values = [values]
if isinstance(values, np.ndarray) and (
values.dtype == self._dtype._record_type
):
# Stored data format
self._data = values
elif self.is_genotype_array(values):
# values is a GenotypeArray, simply check the dtype and return
if self.dtype is not None:
if self.dtype != values.dtype:
raise ValueError(
f"The provided dtype {dtype} doesn't match"
f" the dtype of the provided values {values.dtype}"
)
else:
# Take dtype from the values
self._dtype = values.dtype
# Get the data
if copy:
values = values.copy()
self._data = values._data
elif len(values) == 0:
# Return an empty Genotype Array with the given GenotypeDtype
self._data = np.array(values, dtype=self._dtype._record_type)
elif all([type(i) == Genotype for i in values]):
# Sequence of Genotype objects
genotype_array = self._from_sequence(scalars=values, dtype=dtype, copy=copy)
# Replace self with the created array
self._data = genotype_array._data
self._dtype = genotype_array._dtype
elif all([type(i) == str for i in values]):
# List of Strings
genotype_array = self._from_sequence_of_strings(
strings=values, dtype=dtype, copy=copy
)
# Replace self with the created array
self._data = genotype_array._data
self._dtype = genotype_array._dtype
else:
raise ValueError(
f"Unsupported `values` type passed to __init__: {type(values)}"
)
# Set an anonymous dtype if one was not set
if self.dtype is None:
self._dtype = GenotypeDtype()
# ExtensionArray Initialization Methods
# -------------------------------------
@classmethod
def _from_sequence(
cls,
scalars: Union[Genotype, Iterable[Genotype]],
dtype: Optional[GenotypeDtype] = None,
copy: bool = False,
) -> "GenotypeArray":
"""
Construct a new GenotypeArray from a sequence of Genotypes.
Parameters
----------
scalars : Sequence of Genotype objects
dtype : dtype, optional
Optional GenotypeDtype that must be compatible with the Genotypes
copy : boolean, default False
If True, copy the underlying data.
Returns
-------
GenotypeArray
"""
if type(scalars) == Genotype:
# Despite the documentation, scalars is not always a sequence of objects, sometimes it is just one
scalars = [scalars]
if len(scalars) == 0:
# Pass the scalars object anyway, in case it is an empty GenotypeArray
return cls(values=scalars, dtype=dtype)
if dtype is None:
# Use variant from first genotype
variant = scalars[0].variant
else:
# Use the dtype variant
variant = dtype.variant
values = []
for idx, gt in enumerate(scalars):
if not variant.is_same_position(gt.variant):
raise ValueError(
f"Variant for Genotype {idx} of {len(scalars)} ({gt.variant}) "
f"is not compatible with the prior ones ({variant})"
)
elif variant.score != gt.variant.score:
raise ValueError(
f"Variant for Genotype {idx} of {len(scalars)} ({gt.variant}) "
f"is compatible, but has a different variant score"
)
else:
values.append((gt.allele_idxs, gt._float_score))
result = cls(values=[], dtype=GenotypeDtype(variant))
result._data = np.array(values, dtype=result._dtype._record_type)
return result
@classmethod
def _from_sequence_of_strings(cls, strings, dtype, copy: bool = False):
"""Construct a new ExtensionArray from a sequence of strings.
Parameters
----------
strings : Sequence
A list of genotype strings, like "A/C", "G/delG", etc
dtype : dtype, optional
GenotypeDtype with variant information used to process the strings
copy : boolean, default False
If True, copy the underlying data.
Note
----
This will not allow the use of genotype scores as they are not encoded in genotype strings
Returns
-------
GenotypeArray
"""
variant = dtype.variant
return cls._from_sequence(
[variant.make_genotype_from_str(s) for s in strings], dtype, copy
)
@classmethod
def _from_factorized(cls, values, original):
"""
Reconstruct an ExtensionArray after factorization.
Parameters
----------
values : ndarray
An integer ndarray with the factorized values.
original : ExtensionArray
The original ExtensionArray that factorize was called on.
See Also
--------
factorize
ExtensionArray.factorize
"""
return cls(values, dtype=original.dtype)
@classmethod
def is_genotype_array(cls, other):
if isinstance(other, cls):
return True
else:
return False
# Attributes
# ----------
@property
def dtype(self) -> GenotypeDtype:
"""The specific parametized type"""
return self._dtype
@property
def nbytes(self) -> int:
"""How many bytes to store this object in memory"""
return self._dtype.itemsize * len(self)
def __getitem__(self, index):
"""
Select a subset of self.
Parameters
----------
index : int, slice, or ndarray
* int: The position in 'self' to get.
* slice: A slice object, where 'start', 'stop', and 'step' are
integers or None
* ndarray: A 1-d boolean NumPy ndarray the same length as 'self'
Returns
-------
item : Genotype or GenotypeArray
Notes
-----
"""
# Check and convert the index
index = pd.api.indexers.check_array_indexer(self._data, index)
result = operator.getitem(self._data, index)
if isinstance(result, np.ndarray):
return GenotypeArray(values=result, dtype=self.dtype)
elif isinstance(result, np.void):
variant = self.dtype.variant
return Genotype(
variant=variant,
allele_idxs=result["allele_idxs"],
score=result["gt_score"] if not np.isnan(result["gt_score"]) else None,
)
else:
raise TypeError("Indexing error- unexpected type")
def __setitem__(
self,
key: Union[int, np.ndarray],
value: Union[Genotype, "GenotypeArray", List[Genotype]],
) -> None:
if isinstance(value, list):
# Convert list to genotype array, throwing an error if it doesn't work
value = self._from_sequence(value, dtype=self.dtype)
# Validate the key
if isinstance(key, List):
key = pd.Series(key)
if key.isna().sum() > 0:
raise ValueError(
"Cannot index with an integer indexer containing NA values"
)
if isinstance(key, BooleanArray):
# Convert to a normal boolean array after making NaN rows False
key = key.fillna(False).astype("bool")
# Handle pandas IntegerArray
if isinstance(key, IntegerArray):
if key.isna().sum() > 0:
# Raise an error if there are any NA values
raise ValueError(
"Cannot index with an integer indexer containing NA values"
)
else:
# Convert to a regular numpy array of ints
key = key.astype("int")
# Ensure a mask doesn't have an incorrect length
if isinstance(key, np.ndarray) and key.dtype == "bool":
if len(key) != len(self):
raise IndexError("wrong length")
# Update allele values directly
if isinstance(value, Genotype):
self._data[key] = tuple([value.allele_idxs, value._float_score])
elif isinstance(value, GenotypeArray):
self._data[key] = value._data
elif isinstance(value, pd.Series) and isinstance(value.values, GenotypeArray):
self._data[key] = value.values._data
else:
raise ValueError(
f"Can't set the value in a GenotypeArray with '{type(value)}"
)
def __len__(self):
return len(self._data)
def take(self, indexer, allow_fill=False, fill_value=None):
indexer = np.asarray(indexer)
msg = (
"Index is out of bounds or cannot do a "
"non-empty take from an empty array."
)
if allow_fill:
if fill_value is None:
fill_value = self.dtype.na_value
# bounds check
if (indexer < -1).any():
raise ValueError
try:
output = [self[loc] if loc != -1 else fill_value for loc in indexer]
except IndexError as err:
raise IndexError(msg) from err
else:
try:
output = [self[loc] for loc in indexer]
except IndexError as err:
raise IndexError(msg) from err
return self._from_sequence(scalars=output, dtype=self.dtype)
def copy(self):
return GenotypeArray(self._data.copy(), copy(self.dtype))
def factorize(self, na_sentinel: int = -1) -> Tuple[np.ndarray, "GenotypeArray"]:
"""
Return an array of ints indexing unique values
"""
if len(self) == 0:
return np.array([], dtype=np.int64), self
codes = np.ones(len(self), dtype=np.int64)
# Get list of unique genotypes in order they appear (not counting scores)
_, idx = np.unique(self.allele_idxs, return_index=True, axis=0)
uniques = self._data[np.sort(idx)]
uniques = GenotypeArray(values=uniques, dtype=self.dtype)
# Update codes for unique values, not including NA
for idx, gt in enumerate([gt for gt in uniques if gt != self.dtype.na_value]):
codes[self == gt] = idx
# Update codes for NA values
codes[self.isna()] = na_sentinel
# Return the codes and unique values (not including NA)
return codes, uniques[~uniques.isna()]
def unique(self) -> "GenotypeArray":
"""Return a GenotypeArray of unique values"""
_, idx = np.unique(self.allele_idxs, return_index=True, axis=0)
return GenotypeArray(values=self._data[idx], dtype=self.dtype)
def value_counts(self, dropna=True):
"""Return a Series of unique counts with a GenotypeArray index"""
_, unique_idx, unique_counts = np.unique(
self.allele_idxs, return_index=True, return_counts=True, axis=0
)
result = pd.Series(
unique_counts,
index=GenotypeArray(values=self._data[unique_idx], dtype=self.dtype),
)
if dropna:
result = result.loc[result.index != self.dtype.na_value]
return result
def astype(self, dtype, copy=True):
if isinstance(dtype, GenotypeDtype):
if copy:
self = self.copy()
return self
return super(GenotypeArray, self).astype(dtype)
def isna(self):
"""
A 1-D array indicating if each value is missing
"""
return (self.allele_idxs == MISSING_IDX).all(axis=1)
@classmethod
def _concat_same_type(cls, to_concat, axis: int = 0):
"""
Concatenate multiple array
Parameters
----------
to_concat : sequence of this type
Returns
-------
ExtensionArray
"""
# Check dtypes
dtypes = {a.dtype for a in to_concat}
if len(dtypes) != 1:
raise ValueError(
"to_concat must have the same dtype for all values", dtypes
)
data = np.concatenate([ga._data for ga in to_concat], axis=axis)
return GenotypeArray(data, list(dtypes)[0])
# Properties for accessing array metadata
# ----------------------------------------
@property
def variant(self):
"""
Return the variant identifier
"""
return self._dtype.variant
# Properties for accessing individual portions of the internal _data array
# ------------------------------------------------------------------------
@property
def allele_idxs(self):
"""
Return the allele indices for each genotype
"""
return self._data["allele_idxs"]
@property
def gt_scores(self):
"""
Return the genotype score for each genotype (as a float)
"""
scores = self._data["gt_score"].copy().astype("float")
scores[scores == MISSING_IDX] = np.nan
return scores
# Operations
# Note: genotypes are compared by first allele then second, using the order of alleles in the variant
# ----------
def _get_alleles_for_ops(self, other):
"""
Get the scalar allele values (Genotype or str)
or arrays of allele values (GenotypeArray)
or None (NotImplemented)
"""
if isinstance(other, Genotype) or isinstance(other, GenotypeArray):
# Get scalar values for alleles
allele_idxs = other.allele_idxs
# Ensure the comparison is using the same variant
if self.variant != other.variant:
return None
elif type(other) == str:
allele_idxs = tuple(
self.variant.get_idx_from_allele(a) for a in other.split("/")
)
else:
return None
return allele_idxs
def __eq__(self, other):
allele_idxs = self._get_alleles_for_ops(other)
if allele_idxs is None:
return NotImplemented
return (self.allele_idxs == allele_idxs).all(axis=1)
def __ne__(self, other):
allele_idxs = self._get_alleles_for_ops(other)
if allele_idxs is None:
return NotImplemented
return (self.allele_idxs != allele_idxs).any(axis=1)
def __lt__(self, other):
allele_idxs = self._get_alleles_for_ops(other)
if allele_idxs is None:
return NotImplemented
return (self.allele_idxs <= allele_idxs).all(axis=1) & (
self.allele_idxs < allele_idxs
).any(axis=1)
def __le__(self, other):
allele_idxs = self._get_alleles_for_ops(other)
if allele_idxs is None:
return NotImplemented
return (self.allele_idxs <= allele_idxs).all(axis=1)
def __gt__(self, other):
allele_idxs = self._get_alleles_for_ops(other)
if allele_idxs is None:
return NotImplemented
return (self.allele_idxs >= allele_idxs).all(axis=1) & (
self.allele_idxs > allele_idxs
).any(axis=1)
def __ge__(self, other):
allele_idxs = self._get_alleles_for_ops(other)
if allele_idxs is None:
return NotImplemented
return (self.allele_idxs >= allele_idxs).all(axis=1)
###################
# Utility Methods #
###################
def set_reference(self, allele: Union[str, int]) -> None:
"""
Change the reference allele (in-place) by specifying an allele index value or an allele string
Parameters
----------
allele: int or str
The allele that will be set as the reference allele.
Either the allele string, or the index into the variant allele list
Returns
-------
None
"""
# Get the allele as an integer and as a string
if type(allele) == str:
allele_idx = self.variant.get_idx_from_allele(allele, add=False)
allele_str = allele
elif type(allele) == int:
if not self.variant.is_valid_allele_idx(allele):
raise ValueError(
f"{allele} is not a valid allele index,"
f" the variant has {len(self.variant.alleles)} alleles."
)
allele_idx = allele
allele_str = self.variant.alleles[allele]
else:
raise ValueError(
f"The `allele` must be a str or int, not an instance of '{type(allele)}'"
)
if allele_idx == 0:
# Nothing to do, this is already the reference
return self
# Update the list of alleles
old_ref = self.variant.alleles[0]
# Replace existing value with the old ref
self.variant.alleles[allele_idx] = old_ref
# Add new ref to the beginning and remove the old ref
self.variant.alleles = [
allele_str,
] + self.variant.alleles[1:]
# Update stored alleles
was_ref = self._data["allele_idxs"] == 0
was_allele = self._data["allele_idxs"] == allele_idx
# What was the reference is now the new reference position
self._data["allele_idxs"][was_ref] = allele_idx
# What was the allele is now reference (0)
self._data["allele_idxs"][was_allele] = 0