"""
Typing
------
The code has to deal with many different alternative data types for what is essentially two basic
data types: 2D matrices and 1D vectors.
Specifically, we have pandas data frames and series, Scipy sparse matrices, and numpy
multi-dimensional arrays (not to mention the deprecated numpy matrix type).
Python has the great ability to "duck type", so in an ideal world, we could just pretend these are
just two data types and be done. In practice, this is hopelessly broken.
First, even operations that exists for all data types sometimes have different interfaces
(as in, ``np.foo(matrix, ...)`` vs. ``matrix.foo(...)``).
Second, operating on sparse and dense data often requires completely different code paths.
This makes it very easy to write code that works today and breaks tomorrow when someone passes a
pandas series to a function that expects a numpy array and it just *almost* works correctly (and god
help the poor soul that mixes up a numpy matrix with a numpy 2d array, or passes a categorical
pandas series to something that expects a series of strings).
"Eternal vigilance is the cost of freedom" - the solution here is to define a bunch of fake types,
which are almost entirely for the benefit of the ``mypy`` type checker (with some run-time
assertions as well).
This not only makes the code intent explicit ("explicit is better than implicit") but also allows us
to leverage ``mypy`` to catch errors such as applying a numpy operation on a sparse matrix, etc.
To put some order in this chaos, the following concepts are used:
* :py:const:`Shaped` is any 1d or 2d data in any format we can work with. :py:const:`Matrix` is
any 2d data, and :py:const:`Vector` is any 1d data.
* For 2D data, we allow multiple data types that we can't directly operate on:
most :py:class:`SparseMatrix` layouts, :py:class:`PandasFrame` and ``np.matrix`` have
strange quirks when it comes to directly operating on them and should be avoided, while CSR and
CSC :py:class:`CompressedMatrix` sparse matrices and properly-laid-out 2D numpy arrays
:py:const:`NumpyMatrix` are in general well-behaved. We therefore introduce the concept of
:py:const:`ProperMatrix` vs. :py:const:`ImproperMatrix` types, and provide functions that
manipulate whether the "proper" data is in row-major or column-major order.
* For 1D data, we just distinguish between :py:class:`PandasSeries` and 1D numpy
:py:const:`NumpyVector` arrays, as these are the only types we allow. In theory we could have also
allowed for sparse vectors but mercifully these are very uncommon so we can just ignore them.
Ironically, now that ``numpy`` added type annotations, the usefulness of the type hints added here
has decreased, since both :py:const:`NumpyVector` and :py:const:`NumpyMatrix` are aliases to the
same ``numpy.ndaarray`` type. Perhaps in the future numpy would allow for using ``Annotated`` types
(with explicit number of dimensions, or even - gasp - the element data type) to allow for more
useful type annotations. Or this could all be ported to Julia and avoid this whole mess.
"""
from abc import abstractmethod
from contextlib import contextmanager
from typing import Any
from typing import Collection
from typing import Iterator
from typing import Optional
from typing import Sized
from typing import Tuple
from typing import TypeVar
from typing import Union
import numpy as np
import pandas as pd # type: ignore
import scipy.sparse as sp # type: ignore
import metacells.utilities.documentation as utd
import metacells.utilities.timing as utm
try:
from typing_extensions import Protocol
except ModuleNotFoundError:
class Protocol: # type: ignore
"""
Placeholder if we don't have ``typing_extensions``.
"""
__all__ = [
"CPP_DATA_TYPES",
"Shaped",
"ProperShaped",
"ImproperShaped",
"Matrix",
"ProperMatrix",
"NumpyMatrix",
"CompressedMatrix",
"ImproperMatrix",
"SparseMatrix",
"PandasFrame",
"Vector",
"NumpyVector",
"ImproperVector",
"PandasSeries",
"is_1d",
"is_2d",
"maybe_numpy_vector",
"maybe_numpy_matrix",
"maybe_sparse_matrix",
"maybe_compressed_matrix",
"maybe_pandas_frame",
"maybe_pandas_series",
"mustbe_numpy_vector",
"mustbe_numpy_matrix",
"mustbe_sparse_matrix",
"mustbe_compressed_matrix",
"mustbe_pandas_frame",
"mustbe_pandas_series",
"to_proper_matrix",
"to_proper_matrices",
"to_pandas_series",
"to_pandas_frame",
"frozen",
"freeze",
"unfreeze",
"unfrozen",
"to_numpy_matrix",
"to_numpy_vector",
"DENSE_FAST_FLAG",
"SPARSE_FAST_FORMAT",
"SPARSE_SLOW_FORMAT",
"LAYOUT_OF_AXIS",
"PER_OF_AXIS",
"shaped_dtype",
"matrix_layout",
"is_layout",
"is_contiguous",
"to_contiguous",
"mustbe_canonical",
"is_canonical",
"eliminate_zeros",
"sort_indices",
"sum_duplicates",
"shaped_checksum",
]
#: Numpy 2-dimensional data.
#:
#: .. note::
#:
#: This is not to be confused with ``numpy.matrix`` which must not be used, but is returned
#: by the occasional function, and would wreak havoc on the semantics of some operations unless
#: immediately concerted to a proper ``NumpyMatrix``, which is a simple 2-dimensional
#: ``ndarray``.
NumpyMatrix = np.ndarray # Should be: Annotated[np.ndarray, NDim(2)]
#: Numpy 1-dimensional data.
NumpyVector = np.ndarray # Should be: Annotated[np.ndarray, NDim(1)]
# pylint: disable=missing-function-docstring
S = TypeVar("S", bound="ShapedProtocol")
class ShapedProtocol(Protocol):
"""
A ``mypy`` type for any shaped (1- or 2-dimensional, proper or improper) data.
"""
ndim: int
shape: Union[Tuple[int, int], Tuple[int]]
def __getitem__(self, key: Any) -> Any:
...
def __setitem__(self, key: Any, value: Any) -> Any:
...
def transpose(self: S) -> S:
...
SP = TypeVar("SP", bound="SparseMatrix")
[docs]
class SparseMatrix(ShapedProtocol, Protocol):
"""
A ``mypy`` type for sparse 2-dimensional data.
Should have been ``SparseMatrix = sp.base.spmatrix``.
"""
shape: Tuple[int, int]
nnz: int
def getformat(self) -> str:
...
def toarray(self, order: Optional[str] = None) -> NumpyMatrix:
...
def multiply(self: SP, other: ShapedProtocol) -> SP:
...
def getcol(self: SP, index: int) -> SP:
...
def getrow(self: SP, index: int) -> SP:
...
def sum(self, *, axis: int) -> NumpyVector:
...
def max(self, *, axis: int) -> NumpyVector:
...
def nanmax(self, *, axis: int) -> NumpyVector:
...
def min(self, *, axis: int) -> NumpyVector:
...
def nanmin(self, *, axis: int) -> NumpyVector:
...
def getnnz(self, *, axis: int) -> NumpyVector:
...
def argmax(self, *, axis: int) -> NumpyVector:
...
def maximum(self: SP, other: SP) -> SP:
...
def tocsr(self) -> "CompressedMatrix":
...
def tocsc(self) -> "CompressedMatrix":
...
def nonzero(self) -> Tuple[np.ndarray, np.ndarray]:
...
def mean(self: SP, *, axis: int) -> "np.ndarray":
...
def copy(self: SP) -> SP:
...
[docs]
class CompressedMatrix(SparseMatrix, Protocol):
"""
A ``mypy`` type for sparse CSR/CSC 2-dimensional data.
Should have been ``CompressedMatrix = sp..._cs_matrix``.
"""
indices: np.ndarray
indptr: np.ndarray
data: np.ndarray
has_sorted_indices: bool
has_canonical_format: bool
def sum_duplicates(self) -> None:
...
def eliminate_zeros(self) -> None:
...
def sort_indices(self) -> None:
...
class PandasIndex(ShapedProtocol, Collection, Sized, Protocol):
"""
A ``mypy`` type for a pandas index.
"""
values: NumpyVector
[docs]
class PandasFrame(ShapedProtocol, Protocol):
"""
A ``mypy`` type for pandas 2-dimensional data.
Should have been ``PandasFrame = pd.DataFrame``.
"""
shape: Tuple[int, int]
values: NumpyMatrix
index: PandasIndex
columns: PandasIndex
def __delitem__(self, key: Any) -> None:
...
def __constraints__(self, key: Any) -> bool:
...
[docs]
class PandasSeries(ShapedProtocol, Sized, Protocol):
"""
A ``mypy`` type for pandas 1-dimensional data.
Should have been ``PandasSeries = pd.Series``.
"""
size: int
shape: Tuple[int]
values: NumpyVector
index: PandasIndex
@abstractmethod
def sort_values(self, inplace: bool, ascending: bool) -> None:
...
# pylint: enable=missing-function-docstring
#: The data types supported by the C++ extensions code.
CPP_DATA_TYPES = ["float32", "float64", "int32", "int64", "uint32", "uint64"]
#: A ``mypy`` type for "proper" 2-dimensional data.
#:
#: "Proper" data allows for direct processing without having
#: to mess with its formatting.
ProperMatrix = Union[NumpyMatrix, CompressedMatrix]
#: A ``mypy`` type for "improper" 2-dimensional data.
#:
#: "Improper" data contains or can be converted to "proper" data.
ImproperMatrix = Union[PandasFrame, SparseMatrix]
#: A ``mypy`` type for any 2-dimensional data.
Matrix = Union[ProperMatrix, ImproperMatrix]
#: An "improper" 1-dimensional data.
ImproperVector = Union[Collection[int], Collection[float], PandasSeries]
#: A ``mypy`` type for any 1-dimensional data.
#:
#: .. todo::
#:
#: Is there any need for ``SparseVector``?
Vector = Union[NumpyVector, ImproperVector]
#: A "proper" 1- or 2-dimensional data.
ProperShaped = Union[ProperMatrix, NumpyVector]
#: An "improper" 1- or 2- dimensional data.
ImproperShaped = Union[ImproperMatrix, ImproperVector]
#: Shaped data of any of the types we can deal with.
Shaped = Union[ProperShaped, ImproperShaped]
#: Pandas data in various types.
PandasShaped = Union[PandasFrame, PandasSeries]
[docs]
def is_1d(shaped: Shaped) -> bool:
"""
Test whether the ``shaped`` is 1-dimensional.
"""
return hasattr(shaped, "ndim") and getattr(shaped, "ndim") == 1
[docs]
def is_2d(shaped: Shaped) -> bool:
"""
Test whether the ``shaped`` is 2-dimensional.
"""
return hasattr(shaped, "ndim") and getattr(shaped, "ndim") == 2
[docs]
def maybe_numpy_vector(shaped: Any) -> Optional[NumpyVector]:
"""
Return the ``shaped`` as a :py:const:`NumpyVector`, if it is one.
"""
if isinstance(shaped, np.ndarray) and shaped.ndim == 1:
return shaped
return None
[docs]
def maybe_numpy_matrix(shaped: Any) -> Optional[NumpyMatrix]:
"""
Return the ``shaped`` as a :py:const:`NumpyMatrix`, if it is one.
.. note::
This looks for a 2-dimensional ``numpy.ndarray`` which is **not** a ``numpy.matrix``. Do not
use ``numpy.matrix`` - it is deprecated and behaves subtly different to a 2-dimensional
``numpy.ndarray`` leading to hard-to-find bugs.
"""
if isinstance(shaped, np.ndarray) and shaped.ndim == 2 and not isinstance(shaped, np.matrix):
return shaped
return None
[docs]
def maybe_sparse_matrix(shaped: Any) -> Optional[SparseMatrix]:
"""
Return ``shap`` as a :py:const:`SparseMatrix`, if it is one.
.. note::
This will succeed for a :py:const:`CompressedMatrix` which is a sub-type of a
:py:const:`SparseMatrix`.
"""
if isinstance(shaped, sp.base.spmatrix):
return shaped
return None
[docs]
def maybe_compressed_matrix(shaped: Any) -> Optional[CompressedMatrix]:
"""
Return ``shaped`` as a :py:const:`CompressedMatrix`, if it is one.
"""
if isinstance(shaped, (sp.csr_matrix, sp.csc_matrix)):
return shaped
return None
[docs]
def maybe_pandas_series(shaped: Any) -> Optional[PandasSeries]:
"""
Return ``shaped`` as a :py:const:`PandasSeries`, if it is one.
"""
if isinstance(shaped, pd.Series):
return shaped
return None
[docs]
def maybe_pandas_frame(shaped: Any) -> Optional[PandasFrame]:
"""
Return ``shaped`` s a :py:const:`PandasFrame`, if it is one.
"""
if isinstance(shaped, pd.DataFrame):
return shaped
return None
[docs]
def mustbe_numpy_vector(shaped: Any) -> NumpyVector:
"""
Return ``shaped`` as a :py:const:`NumpyVector`, asserting it must be one.
"""
assert isinstance(shaped, np.ndarray) and shaped.ndim == 1
return shaped
[docs]
def mustbe_numpy_matrix(shaped: Any) -> NumpyMatrix:
"""
Return ``shaped`` as a :py:const:`NumpyMatrix`, asserting it must be one.
.. note::
This looks for a 2-dimensional ``numpy.ndarray`` which is **not** a ``numpy.matrix``. Do not
use ``numpy.matrix`` - it is deprecated and behaves subtly different to a 2-dimensional
``numpy.ndarray`` leading to hard-to-find bugs.
"""
assert isinstance(shaped, np.ndarray) and shaped.ndim == 2 and not isinstance(shaped, np.matrix)
return shaped
[docs]
def mustbe_sparse_matrix(shaped: Any) -> SparseMatrix:
"""
Return ``shaped`` as a :py:const:`SparseMatrix`, asserting it must be one.
.. note::
This will succeed for a :py:const:`CompressedMatrix` which is a sub-type of a
:py:const:`SparseMatrix`.
"""
assert isinstance(shaped, sp.base.spmatrix)
return shaped
[docs]
def mustbe_compressed_matrix(shaped: Any) -> CompressedMatrix:
"""
Return ``shaped`` as a :py:const:`CompressedMatrix`, asserting it must be one.
"""
assert isinstance(shaped, (sp.csr_matrix, sp.csc_matrix))
return shaped
[docs]
def mustbe_pandas_series(shaped: Any) -> PandasSeries:
"""
Return ``shaped`` as a :py:const:`PandasSeries`, asserting it must be one.
"""
assert isinstance(shaped, pd.Series)
return shaped
[docs]
def mustbe_pandas_frame(shaped: Any) -> PandasFrame:
"""
Return ``shaped`` as a :py:const:`PandasFrame`, asserting it must be one.
"""
assert isinstance(shaped, pd.DataFrame)
return shaped
[docs]
@utd.expand_doc()
def to_proper_matrix(matrix: Matrix, *, default_layout: str = "row_major") -> ProperMatrix:
"""
Given some 2D ``matrix``, return in in a :py:const:`ProperMatrix` format we can safely process.
If the data is in some strange sparse format, use ``default_layout`` (default: {default_layout})
to decide whether to return it in ``row_major`` (CSR) or ``column_major`` (CSC) layout.
"""
if matrix.ndim != 2:
raise ValueError(f"data is {matrix.ndim}-dimensional, " "expected 2-dimensional")
if default_layout not in LAYOUT_OF_AXIS:
raise ValueError(f"invalid default layout: {default_layout}")
frame = maybe_pandas_frame(matrix)
if frame is not None:
matrix = frame.values
if isinstance(matrix, pd.core.arrays.categorical.Categorical):
matrix = np.array(matrix)
compressed = maybe_compressed_matrix(matrix)
if compressed is not None:
return compressed
sparse = maybe_sparse_matrix(matrix)
if sparse is not None:
if default_layout == "column_major":
with utm.timed_step("matrix.tocsc"):
utm.timed_parameters(results=sparse.shape[1], elements=sparse.nnz / sparse.shape[1])
return sparse.tocsc()
with utm.timed_step("matrix.tocsr"):
utm.timed_parameters(results=sparse.shape[0], elements=sparse.nnz / sparse.shape[0])
return sparse.tocsr()
dense = maybe_numpy_matrix(matrix)
if dense is None:
dense = np.asarray(matrix)
return dense
[docs]
def to_proper_matrices(
matrix: Matrix, *, default_layout: str = "row_major"
) -> Tuple[ProperMatrix, Optional[NumpyMatrix], Optional[CompressedMatrix]]:
"""
Similar to :py:func:`to_proper_matrix` but return a tuple with the proper matrix and also its
:py:const:`NumpyMatrix` representation and its py:const:`CompressedMatrix` representation.
Exactly one of these two representations will be ``None``.
If the data is in some strange sparse format, use ``default_layout`` (default: {default_layout})
to decide whether to return it in ``row_major`` (CSR) or ``column_major`` (CSC) layout.
This is used to pick between dense and compressed code paths, and provides typed references so
``mypy`` can type-check each of these paths:
.. code:: python
proper, dense, compressed = to_proper_matrices(matrix)
... Common code path can use the proper matrix value ...
if dense is not None:
assert compressed is None
... Dense code path can use the dense matrix ...
else:
assert compressed is not None
... Compressed code path can use the compressed matrix value ...
if metacells.ut.matrix_layout(compressed) == 'row_major':
... CSR code path ...
else:
... CSC code path ...
"""
proper = to_proper_matrix(matrix, default_layout=default_layout)
dense = maybe_numpy_matrix(proper)
compressed = maybe_compressed_matrix(proper)
assert (dense is None) or (compressed is None)
assert (dense is None) != (compressed is None)
return (proper, dense, compressed)
[docs]
def to_pandas_series(
vector: Optional[Vector] = None,
*,
index: Optional[Vector] = None,
) -> PandasSeries:
"""
Construct a pandas series from any :py:const:`Vector`.
"""
if vector is None:
return pd.Series(index=index)
return pd.Series(to_numpy_vector(vector), index=index)
[docs]
def to_pandas_frame(
matrix: Optional[Matrix] = None, *, index: Optional[Vector] = None, columns: Optional[Vector] = None
) -> PandasFrame:
"""
Construct a pandas frame from any :py:const:`Matrix`.
"""
if matrix is None:
return pd.DataFrame(index=index, columns=columns)
sparse = maybe_sparse_matrix(matrix)
if sparse is not None:
return pd.DataFrame.sparse.from_spmatrix(sparse, index=index, columns=columns)
return pd.DataFrame(to_numpy_matrix(matrix, only_extract=True), index=index, columns=columns)
[docs]
def frozen(shaped: Union[ProperShaped, PandasShaped]) -> bool:
"""
Test whether the ``shaped`` data is protected against future modification.
"""
compressed = maybe_compressed_matrix(shaped)
if compressed is not None:
assert (
compressed.indices.flags.writeable == compressed.indptr.flags.writeable == compressed.data.flags.writeable
)
return not compressed.data.flags.writeable
if isinstance(shaped, (pd.DataFrame, pd.Series, pd.core.indexes.base.Index)):
shaped = shaped.values
if isinstance(shaped, pd.core.arrays.categorical.Categorical):
shaped = shaped.codes
if isinstance(shaped, np.ndarray):
return not shaped.flags.writeable
raise NotImplementedError(f"frozen of {shaped.__class__}")
[docs]
def freeze(shaped: Union[ProperShaped, PandasShaped]) -> None:
"""
Protect the ``shaped`` data against future modification.
"""
compressed = maybe_compressed_matrix(shaped)
if compressed is not None:
compressed.indices.setflags(write=False)
compressed.indptr.setflags(write=False)
compressed.data.setflags(write=False)
return
if isinstance(shaped, (pd.DataFrame, pd.Series, pd.core.indexes.base.Index)):
shaped = shaped.values
if isinstance(shaped, pd.core.arrays.categorical.Categorical):
shaped = shaped.codes
if isinstance(shaped, np.ndarray):
shaped.setflags(write=False)
return
raise NotImplementedError(f"freeze of {shaped.__class__}")
[docs]
def unfreeze(shaped: Union[ProperShaped, PandasShaped]) -> None:
"""
Permit future modification of some ``shaped`` data.
"""
compressed = maybe_compressed_matrix(shaped)
if compressed is not None:
compressed.indices.setflags(write=True)
compressed.indptr.setflags(write=True)
compressed.data.setflags(write=True)
return
if isinstance(shaped, (pd.DataFrame, pd.Series, pd.core.indexes.base.Index)):
shaped = shaped.values
if isinstance(shaped, pd.core.arrays.categorical.Categorical):
shaped = shaped.codes
if isinstance(shaped, np.ndarray):
shaped.setflags(write=True)
return
raise NotImplementedError(f"unfreeze of {shaped.__class__}")
[docs]
@contextmanager
def unfrozen(proper: ProperShaped) -> Iterator[None]:
"""
Execute some in-place modification, temporarily unfreezing the ``proper`` shaped data.
"""
is_frozen = frozen(proper)
if is_frozen:
unfreeze(proper)
try:
yield
finally:
if is_frozen:
freeze(proper)
[docs]
@utd.expand_doc()
def to_numpy_matrix(
matrix: Matrix,
*,
default_layout: str = "row_major",
copy: bool = False,
only_extract: bool = False,
) -> NumpyMatrix:
"""
Convert any :py:const:`Matrix` to a dense 2-dimensional :py:const:`NumpyMatrix`.
If ``copy`` (default: {copy}), a copy of the data is returned even if no conversion needed to be
done.
If ``only_extract`` (default: {only_extract}), then assert this only extracts the data inside
some pandas data.
If the data is in some strange sparse format, use ``default_layout`` (default: {default_layout})
to decide whether to return it in ``row_major`` (CSR) or ``column_major`` (CSC) layout.
"""
assert default_layout in ("row_major", "column_major")
sparse = maybe_sparse_matrix(matrix)
if sparse is not None:
assert not only_extract
with utm.timed_step("sparse.toarray"):
utm.timed_parameters(results=sparse.shape[0], elements=sparse.shape[1])
layout = matrix_layout(sparse) or default_layout
if layout == "row_major":
order = "C"
else:
order = "F"
dense = sparse.toarray(order=order)
else:
dense = mustbe_numpy_matrix(to_proper_matrix(matrix))
if copy and id(dense) == id(matrix):
dense = np.copy(dense)
return mustbe_numpy_matrix(dense)
[docs]
@utd.expand_doc()
def to_numpy_vector(
shaped: Shaped,
*,
copy: bool = False,
only_extract: bool = False,
) -> NumpyVector:
"""
Convert any :py:const:`Vector`, or a :py:const:`Matrix` where one of the dimensions has size
one, to a :py:const:`NumpyVector`.
If ``copy`` (default: {copy}), a copy of the data is returned even if no conversion needed to be
done.
If ``only_extract`` (default: {only_extract}), then assert this only extracts the data inside
some pandas data.
"""
if not hasattr(shaped, "ndim"):
dense = np.array(shaped)
elif is_1d(shaped):
if isinstance(shaped, (pd.DataFrame, pd.Series, pd.core.indexes.base.Index)):
shaped = shaped.values
if isinstance(shaped, pd.core.arrays.categorical.Categorical):
shaped = np.array(shaped)
dense = shaped
else:
assert is_2d(shaped)
assert shaped.shape[0] == 1 or shaped.shape[1] == 1 # type: ignore
dense = to_numpy_matrix(shaped, copy=copy, only_extract=only_extract) # type: ignore
dense = np.reshape(dense, -1)
if copy and id(dense) == id(shaped):
dense = np.copy(dense)
return mustbe_numpy_vector(dense)
#: Which flag indicates efficient 2D dense matrix layout.
DENSE_FAST_FLAG = {"column_major": "F_CONTIGUOUS", "row_major": "C_CONTIGUOUS"}
#: Which format indicates efficient 2D sparse matrix layout.
SPARSE_FAST_FORMAT = {"column_major": "csc", "row_major": "csr"}
#: Which format indicates inefficient 2D sparse matrix layout.
SPARSE_SLOW_FORMAT = {"column_major": "csr", "row_major": "csc"}
#: The layout by the ``axis`` parameter.
LAYOUT_OF_AXIS = ("row_major", "column_major")
#: When reducing data, get results ``per`` row or column (by the ``axis`` parameter).
PER_OF_AXIS = ("row", "column")
[docs]
def is_layout(matrix: Matrix, layout: Optional[str]) -> bool:
"""
Test whether the ``matrix`` is arranged according to the ``layout``.
This will always succeed if the ``layout`` is ``None``.
"""
if layout is None:
return True
if matrix.shape[0] == 1 or matrix.shape[1] == 1:
return True
sparse = maybe_sparse_matrix(matrix)
if sparse is not None:
return sparse.getformat() == SPARSE_FAST_FORMAT[layout]
dense = to_numpy_matrix(matrix, only_extract=True)
return dense.flags[DENSE_FAST_FLAG[layout]] # type: ignore
[docs]
def shaped_dtype(shaped: Shaped) -> str:
"""
Return the data type of the element of shaped data.
"""
frame = maybe_pandas_frame(shaped)
if frame is not None:
shaped = frame.values
compressed = maybe_compressed_matrix(shaped)
if compressed is not None:
shaped = compressed.data
series = maybe_pandas_series(shaped)
if series is not None:
shaped = series.values
if isinstance(shaped, pd.core.arrays.categorical.Categorical):
shaped = np.array(shaped)
if isinstance(shaped, np.ndarray):
return str(shaped.dtype)
raise AssertionError(f"unexpected shaped type: {shaped.__class__.__qualname__}")
[docs]
def matrix_layout(matrix: Matrix) -> Optional[str]:
"""
Return which layout the ``matrix`` is arranged by (``row_major`` or ``column_major``).
If the data is in some strange sparse format, returns ``None``.
"""
sparse = maybe_sparse_matrix(matrix)
if sparse is not None:
for layout, sparse_format in SPARSE_FAST_FORMAT.items():
if sparse.getformat() == sparse_format:
return layout
return None
dense = to_numpy_matrix(matrix, only_extract=True)
for layout, flag in DENSE_FAST_FLAG.items():
if dense.flags[flag]: # type: ignore
return layout
return None
[docs]
def is_contiguous(vector: Vector) -> bool:
"""
Return whether the ``vector`` is contiguous in memory.
This is only ``True`` for a dense vector.
"""
if isinstance(vector, (pd.DataFrame, pd.Series, pd.core.indexes.base.Index)):
vector = vector.values
if isinstance(vector, pd.core.arrays.categorical.Categorical):
vector = np.array(vector)
dense = maybe_numpy_vector(vector)
if dense is None:
return False
return dense.flags.c_contiguous and dense.flags.f_contiguous
[docs]
def to_contiguous(vector: Vector, *, copy: bool = False) -> NumpyVector:
"""
Return the ``vector`` in contiguous (dense) format.
If ``copy`` (default: {copy}), a copy of the data is returned even if no conversion needed to be
done.
"""
dense = to_numpy_vector(vector, copy=copy)
if not dense.flags.c_contiguous or not dense.flags.f_contiguous:
dense = np.copy(dense)
assert dense.flags.c_contiguous and dense.flags.f_contiguous
return dense
[docs]
def mustbe_canonical(shaped: Shaped) -> None:
"""
Assert that some data is in canonical format.
For numpy matrices or vectors, this means the data is contiguous (for matrices, in either
row-major or column-major order).
For sparse matrices, it means the data is in COO format, or compressed (CSC or CSR format), with
sorted indices and no duplicates.
In general, we'd like all the data stored in ``AnnData`` to be canonical.
"""
assert hasattr(shaped, "ndim"), "non-canonical shaped data: has no ndim attribute"
if is_1d(shaped):
is_contiguous_shape = is_contiguous(shaped) # type: ignore
assert is_contiguous_shape, "non-canonical vector: non-contiguous 1D data"
return
assert is_2d(shaped)
matrix: Matrix = shaped # type: ignore
sparse = maybe_sparse_matrix(matrix)
if sparse is None:
assert (
matrix_layout(matrix) is not None
), "non-canonical dense matrix: is not in row-major or column-major layout"
return
assert sparse.getformat() in ("coo", "csr", "csc"), "non-canonical sparse matrix: is not in COO, CSR or CSC format"
if hasattr(sparse, "has_canonical_format"):
assert getattr(sparse, "has_canonical_format"), "non-canonical sparse matrix: might have duplicate indices"
if hasattr(sparse, "has_sorted_indices"):
assert getattr(sparse, "has_sorted_indices"), "non-canonical sparse matrix: might have unsorted indices"
compressed = maybe_compressed_matrix(matrix)
if compressed is not None:
assert is_contiguous(compressed.indptr), "non-canonical CSC/CSR matrix: indptr array is not contiguous"
assert is_contiguous(compressed.data), "non-canonical CSC/CSR matrix: data array is not contiguous"
assert is_contiguous(compressed.indices), "non-canonical CSC/CSR matrix: indices array is not contiguous"
[docs]
def is_canonical(shaped: Shaped) -> bool: # pylint: disable=too-many-return-statements
"""
Return whether the data is in canonical format.
For numpy matrices or vectors, this means the data is contiguous (for matrices, in either
row-major or column-major order).
For sparse matrices, it means the data is in COO format, or compressed (CSC or CSR format), with
sorted indices and no duplicates.
In general, we'd like all the data stored in ``AnnData`` to be canonical.
"""
if not hasattr(shaped, "ndim"):
return False
if is_1d(shaped):
return is_contiguous(shaped) # type: ignore
assert is_2d(shaped)
matrix: Matrix = shaped # type: ignore
sparse = maybe_sparse_matrix(matrix)
if sparse is None:
return matrix_layout(matrix) is not None
if sparse.getformat() not in ("coo", "csr", "csc"):
return False
if hasattr(sparse, "has_canonical_format") and not getattr(sparse, "has_canonical_format"):
return False
if hasattr(sparse, "has_sorted_indices") and not getattr(sparse, "has_sorted_indices"):
return False
compressed = maybe_compressed_matrix(matrix)
if compressed is not None:
return (
compressed.indptr[-1] == compressed.data.size
and is_canonical(compressed.indptr)
and is_canonical(compressed.data)
and is_canonical(compressed.indices)
)
return True
[docs]
def eliminate_zeros(compressed: CompressedMatrix) -> None:
"""
Eliminate zeros in a compressed matrix.
"""
with utm.timed_step("sparse.eliminate_zeros"):
with unfrozen(compressed):
utm.timed_parameters(before=compressed.nnz)
compressed.eliminate_zeros()
utm.timed_parameters(after=compressed.nnz)
[docs]
def sort_indices(compressed: CompressedMatrix) -> None:
"""
Ensure the indices are sorted in each row/column.
"""
with utm.timed_step("sparse.sort_indices"):
with unfrozen(compressed):
utm.timed_parameters(before=compressed.nnz)
compressed.sort_indices()
utm.timed_parameters(after=compressed.nnz)
[docs]
def sum_duplicates(compressed: CompressedMatrix) -> None:
"""
Eliminate duplicates in a compressed matrix.
"""
with utm.timed_step("sparse.sum_duplicates"):
with unfrozen(compressed):
utm.timed_parameters(before=compressed.nnz)
compressed.sum_duplicates()
utm.timed_parameters(after=compressed.nnz)
[docs]
def shaped_checksum(shaped: Shaped) -> float:
"""
Return a checksum of the contents of ``shaped`` data (for debugging reproducibility).
"""
if is_1d(shaped):
values = to_numpy_vector(shaped)
else:
values = to_numpy_matrix(shaped).ravel() # type: ignore
return np.nansum(values.astype("float64") * (1 + np.arange(len(values))))