Source code for metacells.utilities.computation

"""
Computation
-----------

Most of the functions defined here are thin wrappers around builtin numpy or scipy functions, with
others wrapping C++ extensions provided as part of the metacells package itself.

The key distinction of the functions here is that they provide a uniform interface for all the
supported :py:const:`metacells.utilities.typing.Matrix` and
:py:const:`metacells.utilities.typing.Vector` types, which makes them safe to use in our code
without worrying about the exact data type used. In theory, Python duck-typing should have provided
this out of the box, but it seems that without explicit types and interfaces, the interfaces of the
different types diverge to the point where this just doesn't work.

All the functions here (optionally) also allow collecting timing information using
:py:mod:`metacells.utilities.timing`, to make it easier to locate the performance bottleneck of the
analysis pipeline.
"""

import re
import sys
from math import ceil
from math import floor
from math import sqrt
from re import Pattern
from typing import Any
from typing import Callable
from typing import Collection
from typing import List
from typing import Optional
from typing import Tuple
from typing import TypeVar
from typing import Union
from typing import overload
from warnings import catch_warnings
from warnings import filterwarnings
from warnings import warn

import cvxpy
import igraph as ig  # type: ignore
import numpy as np
import pandas as pd  # type: ignore
import scipy.sparse as sp  # type: ignore
import scipy.stats as ss  # type: ignore

import metacells.utilities.documentation as utd
import metacells.utilities.logging as utl
import metacells.utilities.parallel as utp
import metacells.utilities.timing as utm
import metacells.utilities.typing as utt

if "sphinx" not in sys.argv[0]:
    import metacells.extensions as xt  # type: ignore

__all__ = [
    "allow_inefficient_layout",
    "to_layout",
    "sort_compressed_indices",
    "corrcoef",
    "cross_corrcoef_rows",
    "pairs_corrcoef_rows",
    "logistics",
    "cross_logistics_rows",
    "pairs_logistics_rows",
    "log_data",
    "median_per",
    "mean_per",
    "nanmean_per",
    "geomean_per",
    "max_per",
    "nanmax_per",
    "min_per",
    "nanmin_per",
    "nnz_per",
    "sum_per",
    "sum_squared_per",
    "rank_per",
    "top_per",
    "prune_per",
    "quantile_per",
    "nanquantile_per",
    "scale_by",
    "fraction_by",
    "fraction_per",
    "stdev_per",
    "variance_per",
    "normalized_variance_per",
    "relative_variance_per",
    "sum_matrix",
    "nnz_matrix",
    "mean_matrix",
    "max_matrix",
    "min_matrix",
    "nanmean_matrix",
    "nanmax_matrix",
    "nanmin_matrix",
    "rank_matrix_by_layout",
    "bincount_vector",
    "most_frequent",
    "strongest",
    "highest_weight",
    "weighted_mean",
    "fraction_of_grouped",
    "downsample_matrix",
    "downsample_vector",
    "matrix_rows_folds_and_aurocs",
    "sliding_window_function",
    "patterns_matches",
    "compress_indices",
    "bin_pack",
    "bin_fill",
    "sum_groups",
    "shuffle_matrix",
    "cover_diameter",
    "cover_coordinates",
    "random_piles",
    "represent",
    "min_cut",
    "sparsify_matrix",
]


ALLOW_INEFFICIENT_LAYOUT: bool = True


[docs] def allow_inefficient_layout(allow: bool) -> bool: """ Specify whether to allow processing using an inefficient layout. Returns the previous setting. This is ``True`` by default, which merely warns when an inefficient layout is used. Otherwise, processing an inefficient layout is treated as an error (raises an exception). """ global ALLOW_INEFFICIENT_LAYOUT prev_allow = ALLOW_INEFFICIENT_LAYOUT ALLOW_INEFFICIENT_LAYOUT = allow return prev_allow
@overload def to_layout(matrix: utt.CompressedMatrix, layout: str, *, symmetric: bool = False) -> utt.CompressedMatrix: ... @overload def to_layout(matrix: utt.NumpyMatrix, layout: str, *, symmetric: bool = False) -> utt.NumpyMatrix: ... @overload def to_layout(matrix: utt.ImproperMatrix, layout: str, *, symmetric: bool = False) -> utt.ProperMatrix: ...
[docs] @utm.timed_call() @utd.expand_doc() def to_layout(matrix: utt.Matrix, layout: str, *, symmetric: bool = False) -> utt.ProperMatrix: """ Return the ``matrix`` in a specific ``layout`` for efficient processing. That is, if ``layout`` is ``column_major``, re-layout the matrix for efficient per-column (variable, gene) slicing/processing. For sparse matrices, this is ``csc`` format; for dense matrices, this is Fortran (column-major) format. Similarly, if ``layout`` is ``row_major``, re-layout the matrix for efficient per-row (observation, cell) slicing/processing. For sparse matrices, this is ``csr`` format; for dense matrices, this is C (row-major) format. If the matrix is already in the correct layout, it is returned as-is. If the matrix is ``symmetric`` (default: {symmetric}), it must be square and is assumed to be equal to its own transpose. This allows converting it from one layout to another using the efficient (essentially zero-cost) transpose operation. Otherwise, a new copy is created in the proper layout. This is a costly operation as it needs to move all the data elements to their proper place. This uses a C++ extension to deal with compressed data (the builtin implementation is much slower). Even so this operation is costly; still, it makes the following processing **much** more efficient, so it is typically a net performance gain overall. """ assert layout in utt.LAYOUT_OF_AXIS proper, dense, compressed = utt.to_proper_matrices(matrix, default_layout=layout) utm.timed_parameters(rows=proper.shape[0], columns=proper.shape[1]) if utt.is_layout(proper, layout): utm.timed_parameters(method="none") return proper if symmetric: assert proper.shape[0] == proper.shape[1] proper = proper.transpose() assert utt.is_layout(proper, layout) utm.timed_parameters(method="transpose") return proper is_frozen = utt.frozen(proper) result: utt.ProperMatrix if dense is not None: # utm.timed_parameters(method="reshape") # order = utt.DENSE_FAST_FLAG[layout][0] # with utm.timed_step("numpy.ravel"): # utm.timed_parameters(rows=dense.shape[0], columns=dense.shape[1]) # result = np.reshape(np.ravel(dense, order=order), dense.shape, order=order) # type: ignore result = _relayout_dense(dense, layout, 1024 * 1024) else: utm.timed_parameters(method="compressed") assert compressed is not None to_format = utt.SPARSE_FAST_FORMAT[layout] from_format = utt.SPARSE_SLOW_FORMAT[layout] assert compressed.getformat() == from_format compressed = _relayout_compressed(compressed) assert compressed.getformat() == to_format result = compressed if is_frozen: utt.freeze(result) assert result.shape == matrix.shape return result
@utm.timed_call() def _relayout_dense(dense: utt.NumpyMatrix, layout: str, block_size: int) -> utt.NumpyMatrix: """ Efficient serial layout conversion of a ``dense`` matrix. """ block_elements = int(sqrt(block_size / dense.dtype.itemsize)) row_elements = dense.shape[0] column_elements = dense.shape[1] row_blocks = int(ceil(row_elements / block_elements)) column_blocks = int(ceil(column_elements / block_elements)) numpy_order = utt.DENSE_FAST_FLAG[layout][0] result = np.empty(dense.shape, dtype=dense.dtype, order=numpy_order) # type: ignore for row_block in range(row_blocks): start_row = int(round(row_block * row_elements / row_blocks)) stop_row = int(round((row_block + 1) * row_elements / row_blocks)) for column_block in range(column_blocks): start_column = int(round(column_block * column_elements / column_blocks)) stop_column = int(round((column_block + 1) * column_elements / column_blocks)) result[start_row:stop_row, start_column:stop_column] = dense[start_row:stop_row, start_column:stop_column] return result @utm.timed_call() def _relayout_compressed(compressed: utt.CompressedMatrix) -> utt.CompressedMatrix: """ Efficient parallel conversion of a CSR/CSC ``matrix`` to a CSC/CSR matrix. """ axis = ("csr", "csc").index(compressed.getformat()) if compressed.nnz < 2: if axis == 0: compressed = compressed.tocsc() else: compressed = compressed.tocsr() utt.sort_indices(compressed) else: matrix_bands_count = compressed.shape[axis] output_bands_count = matrix_elements_count = compressed.shape[1 - axis] nnz_elements_of_output_bands = bincount_vector(compressed.indices, minlength=matrix_elements_count) output_indptr = np.empty(output_bands_count + 1, dtype=compressed.indptr.dtype) output_indptr[0:2] = 0 with utm.timed_step("numpy.cumsum"): utm.timed_parameters(elements=nnz_elements_of_output_bands.size - 1) np.cumsum(nnz_elements_of_output_bands[:-1], out=output_indptr[2:]) output_indices = np.empty(compressed.nnz, dtype=compressed.indices.dtype) output_data = np.empty(compressed.nnz, dtype=compressed.data.dtype) extension_name = "collect_compressed_%s_t_%s_t_%s_t" % ( # pylint: disable=consider-using-f-string compressed.data.dtype, compressed.indices.dtype, compressed.indptr.dtype, ) extension = getattr(xt, extension_name) assert matrix_bands_count == compressed.indptr.size - 1 with utm.timed_step("extensions.collect_compressed"): utm.timed_parameters(results=output_bands_count, elements=matrix_bands_count) assert compressed.indptr[-1] == compressed.data.size extension( compressed.data, compressed.indices, compressed.indptr, output_data, output_indices, output_indptr[1:] ) assert output_indptr[-1] == compressed.indptr[-1] constructor = (sp.csr_matrix, sp.csc_matrix)[1 - axis] compressed = constructor((output_data, output_indices, output_indptr), shape=compressed.shape) sort_compressed_indices(compressed, force=True) compressed.has_canonical_format = True return compressed
[docs] def sort_compressed_indices(matrix: utt.CompressedMatrix, force: bool = False) -> None: """ Efficient parallel sort of indices in a CSR/CSC ``matrix``. This will skip sorting a matrix that is marked as sorted, unless ``force`` is specified. """ assert matrix.getformat() in ("csr", "csc") if matrix.has_sorted_indices and not force: return matrix_bands_count = matrix.indptr.size - 1 extension_name = "sort_compressed_indices_%s_t_%s_t_%s_t" % ( # pylint: disable=consider-using-f-string matrix.data.dtype, matrix.indices.dtype, matrix.indptr.dtype, ) extension = getattr(xt, extension_name) with utm.timed_step("extensions.sort_compressed_indices"): utm.timed_parameters(results=matrix_bands_count, elements=matrix.nnz / matrix_bands_count) extension(matrix.data, matrix.indices, matrix.indptr, matrix.shape[1]) matrix.has_sorted_indices = True
def _ensure_per(matrix: utt.Matrix, per: Optional[str]) -> str: if per is not None: assert per in ("row", "column") return per assert matrix.shape[0] == matrix.shape[1] layout = utt.matrix_layout(matrix) assert layout is not None return layout[:-6] def _ensure_layout_for(operation: str, matrix: utt.Matrix, per: Optional[str], allow_inefficient: bool = True) -> None: if utt.is_layout(matrix, f"{per}_major"): return if not ALLOW_INEFFICIENT_LAYOUT: allow_inefficient = False layout = utt.matrix_layout(matrix) if layout is not None: operating_on_matrix_of_wrong_layout = f"{operation} of {per}s of a matrix with {layout} layout" if not allow_inefficient: raise NotImplementedError(operating_on_matrix_of_wrong_layout) warn(operating_on_matrix_of_wrong_layout) return sparse = utt.maybe_sparse_matrix(matrix) if sparse is not None: operating_on_sparse_matrix_of_inefficient_format = ( f"{operation} of {per}s of a sparse matrix with inefficient format: {sparse.getformat()}" ) if not allow_inefficient: raise NotImplementedError(operating_on_sparse_matrix_of_inefficient_format) # warn(operating_on_sparse_matrix_of_inefficient_format) return dense = utt.to_numpy_matrix(matrix, only_extract=True) operating_on_numpy_matrix_of_inefficient_strides = ( f"{operation} of {per}s of a numpy matrix with inefficient strides: {dense.strides}" ) if not allow_inefficient: raise NotImplementedError(operating_on_numpy_matrix_of_inefficient_strides) # warn(operating_on_numpy_matrix_of_inefficient_strides) def _ensure_per_for(operation: str, matrix: utt.Matrix, per: Optional[str]) -> str: per = _ensure_per(matrix, per) _ensure_layout_for(operation, matrix, per) return per def _get_dense_for(operation: str, matrix: utt.Matrix, per: Optional[str]) -> Tuple[str, utt.NumpyMatrix]: per = _ensure_per(matrix, per) dense = utt.to_numpy_matrix(matrix, default_layout=f"{per}_major") _ensure_layout_for(operation, dense, per) return per, dense
[docs] @utm.timed_call() def corrcoef( matrix: utt.Matrix, *, per: Optional[str], reproducible: bool, ) -> utt.NumpyMatrix: """ Similar to for ``numpy.corrcoef``, but also works for a sparse ``matrix``, and can be ``reproducible`` regardless of the number of cores used (at the cost of some slowdown). It only works for matrices with a float or double element data type. If ``reproducible``, a slower (still parallel) but reproducible algorithm will be used. Unlike ``numpy.corrcoef``, if given a row with identical values, instead of complaining about division by zero, this will report a zero correlation. This makes sense for the intended usage of computing similarities between cells/genes - an all-zero row has no data so we declare it to be "not similar" to anything else. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). .. note:: The result is always dense, as even for sparse data, the correlation is rarely exactly zero. """ per, dense = _get_dense_for("corrcoef", matrix, per) if not reproducible or str(dense.dtype) not in ("float", "double", "float32", "float64"): return _corrcoef_fast(dense, per) return _corrcoef_reproducible(dense, per)
@utm.timed_call(".irreproducible") def _corrcoef_fast( dense: utt.NumpyMatrix, per: str, ) -> utt.NumpyMatrix: axis = utt.PER_OF_AXIS.index(per) utm.timed_parameters(results=dense.shape[axis], elements=dense.shape[1 - axis]) with utt.unfrozen(dense): # Replication of numpy code: X = dense if per == "row" else dense.T row_averages = np.average(X, axis=1) X -= row_averages[:, None] result = np.matmul(X, X.T) result *= 1 / X.shape[1] X += row_averages[:, None] diagonal = np.diag(result) stddev = np.sqrt(diagonal) stddev[stddev == 0] = 1 result /= stddev[:, None] result /= stddev[None, :] np.clip(result, -1, 1, out=result) np.fill_diagonal(result, 1.0) return result @utm.timed_call(".reproducible") def _corrcoef_reproducible( dense: utt.NumpyMatrix, per: str, ) -> utt.NumpyMatrix: if per == "column": dense = dense.T utm.timed_parameters(results=dense.shape[0], elements=dense.shape[1]) extension_name = f"correlate_dense_{dense.dtype}_t" result = np.empty((dense.shape[0], dense.shape[0]), dtype="float32") extension = getattr(xt, extension_name) extension(dense, result) result[np.isnan(result)] = 0.0 np.fill_diagonal(result, 1.0) return result
[docs] @utm.timed_call() def cross_corrcoef_rows( first_matrix: utt.NumpyMatrix, second_matrix: utt.NumpyMatrix, *, reproducible: bool, # pylint: disable=unused-argument ) -> utt.NumpyMatrix: """ Similar to for ``numpy.corrcoef``, but computes the correlations between each row of the ``first_matrix`` and each row of the ``second_matrix``. The result matrix contains one row per row of the first matrix and one column per row of the second matrix. Both matrices must be dense, in row-major layout, have the same (float or double) element data type, and contain the same number of columns. If ``reproducible``, a slower (still parallel) but reproducible algorithm will be used. Unlike ``numpy.corrcoef``, if given a row with identical values, instead of complaining about division by zero, this will report a zero correlation. This makes sense for the intended usage of computing similarities between cells/genes - an all-zero row has no data so we declare it to be "not similar" to anything else. .. note:: This only works for floating-point matrices. .. todo:: Implement a fast algorithm for the non-reproducible case. """ first_matrix = utt.mustbe_numpy_matrix(first_matrix) second_matrix = utt.mustbe_numpy_matrix(second_matrix) assert utt.is_layout(first_matrix, "row_major") assert utt.is_layout(second_matrix, "row_major") assert first_matrix.shape[1] == second_matrix.shape[1] assert first_matrix.dtype == second_matrix.dtype first_workaround = first_matrix.shape[0] == 1 if first_workaround: first_matrix = np.concatenate([first_matrix, first_matrix]) second_workaround = second_matrix.shape[0] == 1 if second_workaround: second_matrix = np.concatenate([second_matrix, second_matrix]) extension_name = f"cross_correlate_dense_{first_matrix.dtype}_t" result = np.empty((first_matrix.shape[0], second_matrix.shape[0]), dtype="float32") extension = getattr(xt, extension_name) extension(first_matrix, second_matrix, result) if first_workaround: result = result[0:1, :] if second_workaround: result = result[:, 0:1] return result
[docs] @utm.timed_call() def pairs_corrcoef_rows( first_matrix: utt.NumpyMatrix, second_matrix: utt.NumpyMatrix, *, reproducible: bool, # pylint: disable=unused-argument ) -> utt.NumpyVector: """ Similar to for ``numpy.corrcoef``, but computes the correlations between each row of the ``first_matrix`` and each matching row of the ``second_matrix``. Both matrices must be dense, in row-major layout, have the same (float or double) element data type, and the same shape. If ``reproducible``, a slower (still parallel) but reproducible algorithm will be used. Unlike ``numpy.corrcoef``, if given a row with identical values, instead of complaining about division by zero, this will report a zero correlation. This makes sense for the intended usage of computing similarities between cells/genes - an all-zero row has no data so we declare it to be "not similar" to anything else. .. note:: This only works for floating-point matrices. .. todo:: Implement a fast algorithm for the non-reproducible case. """ first_matrix = utt.mustbe_numpy_matrix(first_matrix) second_matrix = utt.mustbe_numpy_matrix(second_matrix) assert utt.is_layout(first_matrix, "row_major") assert utt.is_layout(second_matrix, "row_major") assert first_matrix.shape == second_matrix.shape assert first_matrix.dtype == second_matrix.dtype is_single_row = first_matrix.shape[0] == 1 if is_single_row: first_matrix = np.vstack([first_matrix, first_matrix]) second_matrix = np.vstack([second_matrix, second_matrix]) extension_name = f"pairs_correlate_dense_{first_matrix.dtype}_t" result = np.empty(first_matrix.shape[0], dtype="float32") extension = getattr(xt, extension_name) extension(first_matrix, second_matrix, result) if is_single_row: result = result[[0]] return result
[docs] @utm.timed_call() def logistics(matrix: utt.NumpyMatrix, *, location: float, slope: float, per: Optional[str]) -> utt.NumpyMatrix: """ Compute a matrix of distances between each pair of rows in a dense (float or double) matrix using the logistics function. The raw value of the logistics distance between a pair of vectors ``x`` and ``y`` is the mean of ``1/(1+exp(-slope*(abs(x[i]-y[i])-location)))``. This has a minimum of ``1/(1+exp(slope*location))`` for identical vectors and an (asymptotic) maximum of 1. We normalize this to a range between 0 and 1, to be useful as a distance measure (with a zero distance between identical vectors). If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). .. todo:: This always uses the dense implementation. Possibly a sparse implementation might be faster. .. note:: The result is always dense, as even for sparse data, the result is rarely exactly zero. """ per, dense = _get_dense_for("logistics", matrix, per) axis = utt.PER_OF_AXIS.index(per) utm.timed_parameters(results=dense.shape[axis], elements=dense.shape[1 - axis]) if per == "column": dense = dense.T extension_name = f"logistics_dense_{dense.dtype}_t" result = np.empty((dense.shape[0], dense.shape[0]), dtype="float32") extension = getattr(xt, extension_name) extension(dense, result, location, slope) return result
[docs] @utm.timed_call() def cross_logistics_rows( first_matrix: utt.NumpyMatrix, second_matrix: utt.NumpyMatrix, *, location: float, slope: float, ) -> utt.NumpyMatrix: """ Similar to for :py:func:`logistics`, but computes the distances between each row of the ``first_matrix`` and each row of the ``second_matrix``. The result matrix contains one row per row of the first matrix and one column per row of the second matrix. Both matrices must be dense, in row-major layout, have the same (float or double) element data type, and contain the same number of columns. """ first_matrix = utt.mustbe_numpy_matrix(first_matrix) second_matrix = utt.mustbe_numpy_matrix(second_matrix) assert utt.is_layout(first_matrix, "row_major") assert utt.is_layout(second_matrix, "row_major") assert first_matrix.shape[1] == second_matrix.shape[1] assert first_matrix.dtype == second_matrix.dtype extension_name = f"cross_logistics_dense_{first_matrix.dtype}_t" result = np.empty((first_matrix.shape[0], second_matrix.shape[0]), dtype="float32") extension = getattr(xt, extension_name) extension(first_matrix, second_matrix, location, slope, result) return result
[docs] @utm.timed_call() def pairs_logistics_rows( first_matrix: utt.NumpyMatrix, second_matrix: utt.NumpyMatrix, *, location: float, slope: float, ) -> utt.NumpyVector: """ Similar to for :py:func:`logistics`, but computes the distances between each row of the ``first_matrix`` and each matching row of the ``second_matrix``. Both matrices must be dense, in row-major layout, have the same (float or double) element data type, and the same shape. """ first_matrix = utt.mustbe_numpy_matrix(first_matrix) second_matrix = utt.mustbe_numpy_matrix(second_matrix) assert utt.is_layout(first_matrix, "row_major") assert utt.is_layout(second_matrix, "row_major") assert first_matrix.shape == second_matrix.shape assert first_matrix.dtype == second_matrix.dtype extension_name = f"pairs_logistics_dense_{first_matrix.dtype}_t" result = np.empty(first_matrix.shape[0], dtype="float32") extension = getattr(xt, extension_name) extension(first_matrix, second_matrix, location, slope, result) return result
S = TypeVar("S", bound="utt.Shaped")
[docs] @utm.timed_call() @utd.expand_doc() def log_data( # pylint: disable=too-many-branches shaped: S, *, base: Optional[float] = None, normalization: float = 0, ) -> S: """ Return the log of the values in the ``shaped`` data. If ``base`` is specified (default: {base}), use this base log. Otherwise, use the natural logarithm. The ``normalization`` (default: {normalization}) specifies how to deal with zeros in the data: * If it is zero, an input zero will become an output ``NaN``. * If it is positive, it is added to the input before computing the log. * If it is negative, input zeros will become log(minimal positive value) + normalization, that is, the zeros will be given a value this much smaller than the minimal "real" log value. .. note:: The result is always dense, as even for sparse data, the log is rarely zero. """ dense: np.ndarray if shaped.ndim == 1: # type: ignore dense = utt.to_numpy_vector(shaped, copy=True) else: dense = utt.to_numpy_matrix(shaped, copy=True) # type: ignore if normalization > 0: dense += normalization else: where = dense > 0 if normalization < 0: dense[~where] = np.min(dense[where]) if base is None: log_function = np.log elif base == 2: log_function = np.log2 # type: ignore base = None elif base == 10: log_function = np.log10 # type: ignore base = None else: assert base > 0 log_function = np.log if normalization == 0: log_function(dense, out=dense, where=where) dense[~where] = None else: log_function(dense, out=dense) if base is not None: dense /= np.log(base) if normalization < 0: dense[~where] += normalization return dense # type: ignore
[docs] @utm.timed_call() @utd.expand_doc() def downsample_matrix( matrix: utt.Matrix, *, per: str, samples: Union[int, utt.Vector], eliminate_zeros: bool = True, inplace: bool = False, random_seed: int, ) -> utt.ProperMatrix: """ Downsample the data ``per`` (one of ``row`` and ``column``) such that the sum of each one becomes ``samples``. If the matrix is sparse, and ``eliminate_zeros`` (default: {eliminate_zeros}), then perform a final phase of eliminating leftover zero values from the compressed format. This means the result will be in "canonical format" so further ``scipy`` sparse operations on it will be faster. If ``inplace`` (default: {inplace}), modify the matrix in-place, otherwise, return a modified copy. A non-zero ``random_seed`` will make the operation replicable. """ assert per in ("row", "column") if isinstance(samples, (int, float)): samples = np.full(matrix.shape[0], samples, dtype="int32") else: assert len(samples) == matrix.shape[0] samples = utt.to_numpy_vector(samples).astype("int32") _, dense, compressed = utt.to_proper_matrices(matrix) if dense is not None: return _downsample_dense_matrix(dense, per, samples, inplace, random_seed) assert compressed is not None return _downsample_compressed_matrix(compressed, per, samples, eliminate_zeros, inplace, random_seed)
def _downsample_dense_matrix( matrix: utt.NumpyMatrix, per: str, samples: utt.NumpyVector, inplace: bool, random_seed: int ) -> utt.NumpyMatrix: if inplace: output = matrix elif per == "row": output = np.empty(matrix.shape, dtype=matrix.dtype, order="C") else: output = np.empty(matrix.shape, dtype=matrix.dtype, order="F") assert output.shape == matrix.shape axis = utt.PER_OF_AXIS.index(per) results_count = matrix.shape[axis] elements_count = matrix.shape[1 - axis] if per == "row": sample_input_vector = matrix[0, :] sample_output_vector = output[0, :] else: sample_input_vector = matrix[:, 0] sample_output_vector = output[:, 0] input_is_contiguous = sample_input_vector.flags.c_contiguous and sample_input_vector.flags.f_contiguous if not input_is_contiguous: raise NotImplementedError( f"downsampling per-{per} of a matrix input matrix with inefficient strides: {matrix.strides}" ) output_is_contiguous = sample_output_vector.flags.c_contiguous and sample_output_vector.flags.f_contiguous if not inplace and not output_is_contiguous: raise NotImplementedError( f"downsampling per-{per} of a matrix output matrix with inefficient strides: {output.strides}" ) if per == "column": matrix = np.transpose(matrix) output = np.transpose(output) assert results_count == matrix.shape[0] assert elements_count == matrix.shape[1] if results_count == 1: input_array = utt.to_numpy_vector(matrix) output_array = utt.to_numpy_vector(output) extension_name = f"downsample_array_{input_array.dtype}_t_{output_array.dtype}_t" extension = getattr(xt, extension_name) with utm.timed_step("extensions.downsample_array"): utm.timed_parameters(elements=input_array.size, samples=np.mean(samples)) extension(input_array, output_array, samples, random_seed) else: extension_name = f"downsample_dense_{matrix.dtype}_t_{output.dtype}_t" extension = getattr(xt, extension_name) with utm.timed_step("extensions.downsample_dense_matrix"): utm.timed_parameters(results=results_count, elements=elements_count, samples=np.mean(samples)) extension(matrix, output, samples, random_seed) if per == "column": output = np.transpose(output) assert output.shape == matrix.shape return output def _downsample_compressed_matrix( matrix: utt.CompressedMatrix, per: str, samples: utt.NumpyVector, eliminate_zeros: bool, inplace: bool, random_seed: int, ) -> utt.CompressedMatrix: axis = utt.PER_OF_AXIS.index(per) results_count = matrix.shape[axis] elements_count = matrix.shape[1 - axis] axis_format = ("csr", "csc")[axis] if matrix.getformat() != axis_format: raise NotImplementedError( f"downsample per-{per} " f"of a sparse matrix in the format: {matrix.getformat()} " f"instead of the efficient format: {axis_format}" ) if inplace: assert not utt.frozen(matrix) output = matrix else: constructor = (sp.csr_matrix, sp.csc_matrix)[axis] output_data = np.empty_like(matrix.data) output = constructor((output_data, np.copy(matrix.indices), np.copy(matrix.indptr)), shape=matrix.shape) output.has_sorted_indices = matrix.has_sorted_indices output.has_canonical_format = matrix.has_canonical_format extension_name = "downsample_compressed_%s_t_%s_t_%s_t" % ( # pylint: disable=consider-using-f-string matrix.data.dtype, matrix.indptr.dtype, output.data.dtype, ) extension = getattr(xt, extension_name) assert results_count == matrix.indptr.size - 1 with utm.timed_step("extensions.downsample_sparse_matrix"): utm.timed_parameters(results=results_count, elements=elements_count, samples=np.mean(samples)) extension(matrix.data, matrix.indptr, output.data, samples, random_seed) if eliminate_zeros: utt.eliminate_zeros(output) if matrix.has_sorted_indices: utt.sort_indices(output) if matrix.has_canonical_format: utt.sum_duplicates(output) return output
[docs] @utm.timed_call() @utd.expand_doc(data_types=",".join([f"``{data_type}``" for data_type in utt.CPP_DATA_TYPES])) def downsample_vector( vector: utt.Vector, samples: int, *, output: Optional[utt.NumpyVector] = None, random_seed: int, ) -> None: """ Downsample a vector of sample counters. **Input** * A numpy ``vector`` containing non-negative integer sample counts. * A desired total number of ``samples``. * An optional numpy array ``output`` to hold the results (otherwise, the input is overwritten). * A ``random_seed`` (non-zero for reproducible results). The arrays may have any of the data types: {data_types}. **Operation** If the total number of samples (sum of the array) is not higher than the required number of samples, the output is identical to the input. Otherwise, treat the input as if it was a set where each index appeared its value number of times. Randomly select the desired number of samples from this set (without repetition), and store in the output the number of times each index was chosen. """ array = utt.to_numpy_vector(vector) if output is None: assert id(array) == id(vector) output = array else: assert output.shape == array.shape extension_name = f"downsample_array_{array.dtype}_t_{output.dtype}_t" extension = getattr(xt, extension_name) with utm.timed_step("extensions.downsample_array"): utm.timed_parameters(elements=array.size, samples=samples) extension(array, output, samples, random_seed)
[docs] @utm.timed_call() def matrix_rows_folds_and_aurocs( matrix: utt.Matrix, *, columns_subset: utt.NumpyVector, columns_scale: Optional[utt.NumpyVector] = None, normalization: float, ) -> Tuple[utt.NumpyVector, utt.NumpyVector]: """ Given a matrix and a subset of the columns, return two vectors. The first contains, for each row, the mean column value in the subset divided by the mean column value outside the subset. The second contains for each row the area under the receiver operating characteristic (AUROC) for the row, that is, the probability that a random column in the subset would have a higher value in this row than a random column outside the subset. If ``columns_scale`` is specified, the data is divided by this scale before computing the AUROC. """ proper, dense, compressed = utt.to_proper_matrices(matrix) rows_count, columns_count = proper.shape if columns_scale is None: columns_scale = np.full(columns_count, 1.0, dtype="float32") else: columns_scale = columns_scale.astype("float32") assert columns_scale.size == columns_count rows_folds = np.empty(rows_count, dtype="float64") rows_auroc = np.empty(rows_count, dtype="float64") columns_subset = utt.to_numpy_vector(columns_subset) if columns_subset.dtype == "bool": assert columns_subset.size == columns_count else: mask: utt.NumpyVector = np.full(columns_count, False) mask[columns_subset] = True columns_subset = mask if dense is not None: extension_name = f"auroc_dense_matrix_{dense.dtype}_t" extension = getattr(xt, extension_name) extension(dense, columns_subset, columns_scale, normalization, rows_folds, rows_auroc) else: assert compressed is not None assert compressed.has_sorted_indices extension_name = "auroc_compressed_matrix_%s_t_%s_t_%s_t" % ( # pylint: disable=consider-using-f-string compressed.data.dtype, compressed.indices.dtype, compressed.indptr.dtype, ) extension = getattr(xt, extension_name) extension( compressed.data, compressed.indices, compressed.indptr, columns_count, columns_subset, columns_scale, normalization, rows_folds, rows_auroc, ) return (rows_folds, rows_auroc)
[docs] @utm.timed_call() def median_per(matrix: utt.Matrix, *, per: Optional[str]) -> utt.NumpyVector: """ Compute the mean value ``per`` (``row`` or ``column``) of some ``matrix``. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). """ per = _ensure_per_for("median", matrix, per) axis = 1 - utt.PER_OF_AXIS.index(per) sparse = utt.maybe_sparse_matrix(matrix) if sparse is not None: return _reduce_matrix("median", sparse, per, lambda sparse: _median_sparse(sparse, per)) # type: ignore dense = utt.to_numpy_matrix(matrix, only_extract=True) return _reduce_matrix("median", dense, per, lambda dense: np.median(dense, axis=axis))
@utm.timed_call() def _median_sparse(sparse: utt.SparseMatrix, per: str) -> utt.NumpyVector: """ Compute the median for each row or column of a sparse matrix. """ if per == "row": assert sparse.getformat() == "csr" else: assert sparse.getformat() == "csc" axis = utt.PER_OF_AXIS.index(per) output = np.empty(sparse.shape[axis], dtype=sparse.dtype) # type: ignore extension_name = "median_compressed_%s_t_%s_t_%s_t" % ( # pylint: disable=consider-using-f-string sparse.data.dtype, # type: ignore sparse.indices.dtype, # type: ignore sparse.indptr.dtype, # type: ignore ) extension = getattr(xt, extension_name) extension(sparse.data, sparse.indices, sparse.indptr, sparse.shape[1 - axis], output) # type: ignore return output
[docs] @utm.timed_call() def mean_per(matrix: utt.Matrix, *, per: Optional[str]) -> utt.NumpyVector: """ Compute the mean value ``per`` (``row`` or ``column``) of some ``matrix``. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). """ per = _ensure_per_for("mean", matrix, per) axis = 1 - utt.PER_OF_AXIS.index(per) return sum_per(matrix, per=per) / matrix.shape[axis]
[docs] @utm.timed_call() def nanmean_per(matrix: utt.Matrix, *, per: Optional[str]) -> utt.NumpyVector: """ Compute the mean value ``per`` (``row`` or ``column``) of some ``matrix``, ignoring ``None`` values, if any. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). """ per, dense = _get_dense_for("nanmean", matrix, per) axis = utt.PER_OF_AXIS.index(per) with catch_warnings(): filterwarnings("ignore", r"All-NaN (slice|axis) encountered") return _reduce_matrix("nanmean", dense, per, lambda dense: np.nanmean(dense, axis=1 - axis))
[docs] @utm.timed_call() def geomean_per(matrix: utt.NumpyMatrix, *, per: Optional[str]) -> utt.NumpyVector: """ Compute the geometric mean value ``per`` (``row`` or ``column``) of some (dense) ``matrix`` (of non-zero values). If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). """ per = _ensure_per_for("geomean", matrix, per) axis = utt.PER_OF_AXIS.index(per) sparse = utt.maybe_sparse_matrix(matrix) assert sparse is None dense = utt.to_numpy_matrix(matrix, only_extract=True) return _reduce_matrix("geomean", dense, per, lambda dense: ss.mstats.gmean(dense, axis=1 - axis))
[docs] @utm.timed_call() def max_per(matrix: utt.Matrix, *, per: Optional[str]) -> utt.NumpyVector: """ Compute the maximal value ``per`` (``row`` or ``column``) of some ``matrix``. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). """ per = _ensure_per_for("max", matrix, per) axis = utt.PER_OF_AXIS.index(per) sparse = utt.maybe_sparse_matrix(matrix) if sparse is not None: return _reduce_matrix("max", sparse, per, lambda sparse: sparse.max(axis=1 - axis)) dense = utt.to_numpy_matrix(matrix, only_extract=True) return _reduce_matrix("max", dense, per, lambda dense: np.max(dense, axis=1 - axis))
[docs] @utm.timed_call() def nanmax_per(matrix: utt.Matrix, *, per: Optional[str]) -> utt.NumpyVector: """ Compute the maximal value ``per`` (``row`` or ``column``) of some ``matrix``, ignoring ``None`` values, if any. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). """ per, dense = _get_dense_for("nanmax", matrix, per) axis = utt.PER_OF_AXIS.index(per) with catch_warnings(): filterwarnings("ignore", r"All-NaN (slice|axis) encountered") return _reduce_matrix("nanmax", dense, per, lambda dense: np.nanmax(dense, axis=1 - axis))
[docs] @utm.timed_call() def min_per(matrix: utt.Matrix, *, per: Optional[str]) -> utt.NumpyVector: """ Compute the minimal value ``per`` (``row`` or ``column``) of some ``matrix``. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). """ per = _ensure_per_for("nanmax", matrix, per) axis = utt.PER_OF_AXIS.index(per) sparse = utt.maybe_sparse_matrix(matrix) if sparse is not None: return _reduce_matrix("min", sparse, per, lambda sparse: sparse.min(axis=1 - axis)) dense = utt.to_numpy_matrix(matrix, only_extract=True) return _reduce_matrix("min", dense, per, lambda dense: np.min(dense, axis=1 - axis))
[docs] @utm.timed_call() def nanmin_per(matrix: utt.Matrix, *, per: Optional[str]) -> utt.NumpyVector: """ Compute the minimal value ``per`` (``row`` or ``column``) of some ``matrix``, ignoring ``None`` values, if any. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). """ per, dense = _get_dense_for("nanmin", matrix, per) axis = utt.PER_OF_AXIS.index(per) with catch_warnings(): filterwarnings("ignore", r"All-NaN (slice|axis) encountered") return _reduce_matrix("nanmean", dense, per, lambda dense: np.nanmin(dense, axis=1 - axis))
[docs] @utm.timed_call() def nnz_per(matrix: utt.Matrix, *, per: Optional[str]) -> utt.NumpyVector: """ Compute the number of non-zero values ``per`` (``row`` or ``column``) of some ``matrix``. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). .. note:: If given a sparse matrix, this returns the number of **structural** non-zeros, that is, the number of entries we actually store data for, even if this data is zero. Use :py:func:`metacells.utilities.typing.eliminate_zeros` if you suspect the sparse matrix of containing structural zero data values. """ per = _ensure_per_for("nnz", matrix, per) axis = utt.PER_OF_AXIS.index(per) sparse = utt.maybe_sparse_matrix(matrix) if sparse is not None: return _reduce_matrix("nnz", sparse, per, lambda sparse: sparse.getnnz(axis=1 - axis)) dense = utt.to_numpy_matrix(matrix, only_extract=True) return _reduce_matrix( "nnz", dense, per, lambda dense: utt.mustbe_numpy_vector(np.count_nonzero(dense, axis=1 - axis)) )
[docs] @utm.timed_call() def sum_per(matrix: utt.Matrix, *, per: Optional[str]) -> utt.NumpyVector: """ Compute the total of the values ``per`` (``row`` or ``column``) of some ``matrix``. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). """ per = _ensure_per_for("sum", matrix, per) axis = utt.PER_OF_AXIS.index(per) sparse = utt.maybe_sparse_matrix(matrix) if sparse is not None: return _reduce_matrix("sum", sparse, per, lambda sparse: sparse.sum(axis=1 - axis)) dense = utt.to_numpy_matrix(matrix, only_extract=True) return _reduce_matrix("sum", dense, per, lambda dense: utt.mustbe_numpy_vector(np.sum(dense, axis=1 - axis)))
[docs] @utm.timed_call() def sum_squared_per(matrix: utt.Matrix, *, per: Optional[str]) -> utt.NumpyVector: """ Compute the total of the squared values ``per`` (``row`` or ``column``) of some ``matrix``. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). .. todo:: The :py:func:`sum_squared_per` implementation allocates and frees a complete copy of the matrix (to hold the squared values). An implementation that directly squares-and-adds the elements would be much more efficient and consume much less memory. """ per = _ensure_per_for("sum_squared", matrix, per) axis = utt.PER_OF_AXIS.index(per) sparse = utt.maybe_sparse_matrix(matrix) if sparse is not None: return _reduce_matrix("sum_squared", sparse, per, lambda sparse: sparse.multiply(sparse).sum(axis=1 - axis)) dense = utt.to_numpy_matrix(matrix, only_extract=True) return _reduce_matrix("sum_squared", dense, per, lambda dense: np.square(dense).sum(axis=1 - axis))
[docs] @utm.timed_call() def rank_per(matrix: utt.Matrix, rank: int, *, per: Optional[str]) -> utt.NumpyVector: """ Get the ``rank`` element ``per`` (``row`` or ``column``) of some ``matrix``. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). .. todo:: This always uses the dense implementation. A sparse implementation should be faster. """ per, dense = _get_dense_for("ranking", matrix, per) if per == "column": dense = dense.transpose() output = np.empty(dense.shape[0], dtype=dense.dtype) extension_name = f"rank_rows_{dense.dtype}_t" extension = getattr(xt, extension_name) extension(dense, output, rank) return output
[docs] @utd.expand_doc() @utm.timed_call() def top_per(matrix: utt.Matrix, top: int, *, per: Optional[str], ranks: bool = False) -> utt.CompressedMatrix: """ Get the ``top`` elements ``per`` (``row`` or ``column``) of some ``matrix``, as a compressed ``per``-major matrix. If ``ranks`` (default: {ranks}), then fill the result with the rank of each element; Otherwise, just keep the original value. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). .. todo:: This always uses the dense implementation. A sparse implementation should be faster. """ per, dense = _get_dense_for("ranking", matrix, per) if per == "column": dense = dense.transpose() size = dense.shape[0] assert top < dense.shape[1] indptr = np.arange(size + 1, dtype="int64") indptr *= top if size == 1: with utm.timed_step(".argpartition"): dense_vector = utt.to_numpy_vector(dense) partition_indices = np.argpartition(dense_vector, len(dense_vector) - top) indices = partition_indices[-top:].astype("int32") np.sort(indices) data = dense_vector[indices].astype("float32") else: indices = np.empty(top * size, dtype="int32") data = np.empty(top * size, dtype=dense.dtype) with utm.timed_step("extensions.collect_top"): utm.timed_parameters(size=size, keep=top, dtype=str(dense.dtype)) extension_name = f"collect_top_{dense.dtype}_t" extension = getattr(xt, extension_name) extension(top, dense, indices, data, ranks) top_data = sp.csr_matrix((data, indices, indptr), shape=dense.shape) top_data.has_sorted_indices = True top_data.has_canonical_format = True if per == "column": top_data = top_data.transpose() return top_data
[docs] @utm.timed_call() def prune_per(compressed: utt.CompressedMatrix, top: int) -> utt.CompressedMatrix: """ Keep just the ``top`` elements of some ``compressed`` matrix, per row for CSR and per column for CSC. """ layout = utt.matrix_layout(compressed) if layout == "row_major": size = compressed.shape[0] else: assert layout == "column_major" size = compressed.shape[1] data_array = np.empty(size * top, dtype=compressed.data.dtype) indices_array = np.empty(size * top, dtype=compressed.indices.dtype) indptr_array = np.empty(1 + size, dtype=compressed.indptr.dtype) with utm.timed_step("extensions.collect_pruned"): utm.timed_parameters(size=size, nnz=compressed.nnz, keep=top) extension_name = "collect_pruned_%s_t_%s_t_%s_t" % ( # pylint: disable=consider-using-f-string compressed.data.dtype, compressed.indices.dtype, compressed.indptr.dtype, ) extension = getattr(xt, extension_name) extension(top, compressed.data, compressed.indices, compressed.indptr, data_array, indices_array, indptr_array) if layout == "row_major": constructor = sp.csr_matrix else: constructor = sp.csc_matrix pruned = constructor( (data_array[: indptr_array[-1]], indices_array[: indptr_array[-1]], indptr_array), shape=compressed.shape ) pruned.has_sorted_indices = True pruned.has_canonical_format = True return pruned
[docs] @utm.timed_call() def quantile_per(matrix: utt.Matrix, quantile: float, *, per: Optional[str]) -> utt.NumpyVector: """ Get the ``quantile`` element ``per`` (``row`` or ``column``) of some ``matrix``. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). .. todo:: This always uses the dense implementation. Possibly a sparse implementation might be faster. """ per, dense = _get_dense_for("quantile", matrix, per) axis = 1 - utt.PER_OF_AXIS.index(per) assert 0 <= quantile <= 1 return np.nanquantile(dense, quantile, axis)
[docs] @utm.timed_call() def nanquantile_per(matrix: utt.Matrix, quantile: float, *, per: Optional[str]) -> utt.NumpyVector: """ Get the ``quantile`` element ``per`` (``row`` or ``column``) of some ``matrix``, ignoring ``None`` values. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). .. todo:: This always uses the dense implementation. A sparse implementation should be more efficient. """ per, dense = _get_dense_for("nanquantile", matrix, per) axis = 1 - utt.PER_OF_AXIS.index(per) assert 0 <= quantile <= 1 return np.nanquantile(dense, quantile, axis)
[docs] @utm.timed_call() def scale_by(matrix: utt.Matrix, scale: utt.Vector, *, by: str) -> utt.ProperMatrix: """ Return a ``matrix`` where each ``by`` (``row`` or ``column``) is scaled by the matching value of the ``vector``. """ axis = utt.PER_OF_AXIS.index(by) assert len(scale) == matrix.shape[axis] scale = utt.to_numpy_vector(scale) _, dense, compressed = utt.to_proper_matrices(matrix, default_layout=f"{by}_major") if compressed is not None: scale_matrix = sp.spdiags(scale, 0, len(scale), len(scale)) if by == "row": result = utt.mustbe_compressed_matrix(scale_matrix * matrix) else: result = utt.mustbe_compressed_matrix(matrix * scale_matrix) utt.sum_duplicates(result) utt.eliminate_zeros(result) assert utt.matrix_layout(result) == utt.matrix_layout(compressed) return result assert dense is not None if by == "row": return dense * scale[:, None] return dense * scale[None, :]
[docs] @utm.timed_call() def fraction_by(matrix: utt.Matrix, *, sums: Optional[utt.Vector] = None, by: str) -> utt.ProperMatrix: """ Return a matrix containing, in each entry, the fraction of the original data out of the total ``by`` (``row`` or ``column``). That is, the sum of ``by`` in the result will be 1. However, if ``sums`` is specified, it is used instead of the sum of each ``by``, so the sum of the results may be different. .. note:: This assumes all the data values are non-negative. """ proper = utt.to_proper_matrix(matrix, default_layout=f"{by}_major") if sums is None: sums = sum_per(proper, per=by) else: sums = utt.to_numpy_vector(sums) zeros_mask = sums == 0 scale = np.reciprocal(sums, where=~zeros_mask) scale[zeros_mask] = 0 return scale_by(proper, scale, by=by)
[docs] @utm.timed_call() def fraction_per(matrix: utt.Matrix, *, per: Optional[str]) -> utt.NumpyVector: """ Get the fraction ``per`` (``row`` or ``column``) out of the total of some ``matrix``. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). """ total_per_element = sum_per(matrix, per=per) return total_per_element / total_per_element.sum()
[docs] @utm.timed_call() def stdev_per(matrix: utt.Matrix, *, per: Optional[str]) -> utt.NumpyVector: """ Get the standard deviantion ``per`` (``row`` or ``column``) of some ``matrix``. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). """ per = _ensure_per_for("stdev", matrix, per) result = variance_per(matrix, per=per) np.sqrt(result, out=result) return result
[docs] @utm.timed_call() def variance_per(matrix: utt.Matrix, *, per: Optional[str]) -> utt.NumpyVector: """ Get the variance ``per`` (``row`` or ``column``) of some ``matrix``. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). """ per = _ensure_per_for("variance", matrix, per) sum_per_element = sum_per(matrix, per=per) sum_squared_per_element = sum_squared_per(matrix, per=per) axis = 1 - utt.PER_OF_AXIS.index(per) size = matrix.shape[axis] result = np.square(sum_per_element).astype(float) result /= -size result += sum_squared_per_element result /= size result[result < 0] = 0 return result
[docs] @utm.timed_call() def normalized_variance_per(matrix: utt.Matrix, *, per: Optional[str], zero_value: float = 1.0) -> utt.NumpyVector: """ Get the normalized variance (variance / mean) ``per`` (``row`` or ``column``) of some ``matrix``. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). If all the values are zero, writes the ``zero_value`` (default: {zero_value}) into the result. """ variance_per_element = variance_per(matrix, per=per) mean_per_element = mean_per(matrix, per=per) zeros_mask = mean_per_element == 0 result = np.reciprocal(mean_per_element, where=~zeros_mask) result[zeros_mask] = 0 result *= variance_per_element result[zeros_mask] = zero_value return result
[docs] @utm.timed_call() def relative_variance_per( matrix: utt.Matrix, *, per: Optional[str], window_size: int, ) -> utt.NumpyVector: """ Return the (log2(normalized_variance) - median(log2(normalized_variance_of_similar)) of the values ``per`` (``row`` or ``column``) of some ``matrix``. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). """ normalized_variance_per_element = normalized_variance_per(matrix, per=per) np.log2(normalized_variance_per_element, out=normalized_variance_per_element) mean_per_element = mean_per(matrix, per=per) median_variance_per_element = sliding_window_function( normalized_variance_per_element, function="median", window_size=window_size, order_by=mean_per_element ) return normalized_variance_per_element - median_variance_per_element
[docs] @utm.timed_call() def sum_matrix(matrix: utt.Matrix) -> Any: """ Compute the sum of all the values in a ``matrix``. """ return np.sum(matrix) # type: ignore
[docs] @utm.timed_call() def nnz_matrix(matrix: utt.Matrix) -> Any: """ Compute the number of non-zero entries in a ``matrix``. .. note:: If given a sparse matrix, this returns the number of **structural** non-zeros, that is, the number of entries we actually store data for, even if this data is zero. Use :py:func:`metacells.utilities.typing.eliminate_zeros` if you suspect the sparse matrix of containing structural zero data values. """ sparse = utt.maybe_sparse_matrix(matrix) if sparse is not None: return sparse.nnz return np.sum(matrix != 0)
[docs] @utm.timed_call() def mean_matrix(matrix: utt.Matrix) -> Any: """ Compute the mean of all the values in a ``matrix``. """ return np.mean(matrix) # type: ignore
[docs] @utm.timed_call() def max_matrix(matrix: utt.Matrix) -> Any: """ Compute the maximum of all the values in a ``matrix``. """ return np.max(matrix) # type: ignore
[docs] @utm.timed_call() def min_matrix(matrix: utt.Matrix) -> Any: """ Compute the minimum of all the values in a ``matrix``. """ return np.min(matrix) # type: ignore
[docs] @utm.timed_call() def nanmean_matrix(matrix: utt.Matrix) -> Any: """ Compute the mean of all the non-NaN values in a ``matrix``. """ with catch_warnings(): filterwarnings("ignore", r"All-NaN (slice|axis) encountered") _, dense, compressed = utt.to_proper_matrices(matrix) if compressed is not None: return np.nansum(compressed.data) / (compressed.shape[0] * compressed.shape[1]) assert dense is not None return np.nanmean(dense)
[docs] @utm.timed_call() def nanmax_matrix(matrix: utt.Matrix) -> Any: """ Compute the maximum of all the non-NaN values in a ``matrix``. """ with catch_warnings(): filterwarnings("ignore", r"All-NaN (slice|axis) encountered") _, dense, compressed = utt.to_proper_matrices(matrix) if compressed is not None: return np.nanmax(dense) # type: ignore assert dense is not None return np.nanmax(dense)
[docs] @utm.timed_call() def nanmin_matrix(matrix: utt.Matrix) -> Any: """ Compute the minimum of all the non-NaN values in a ``matrix``. """ with catch_warnings(): filterwarnings("ignore", r"All-NaN (slice|axis) encountered") _, dense, compressed = utt.to_proper_matrices(matrix) if compressed is not None: return np.nanmin(dense) # type: ignore assert dense is not None return np.nanmin(dense)
[docs] @utm.timed_call() def rank_matrix_by_layout(matrix: utt.NumpyMatrix, ascending: bool) -> Any: """ Replace each element of the matrix with its rank (in row for ``row_major``, in column for ``column_major``). If ``ascending`` then rank 1 is the minimal element. Otherwise, rank 1 is the maximal element. """ layout = utt.matrix_layout(matrix) assert layout is not None extension_name = f"rank_matrix_{matrix.dtype}_t" extension = getattr(xt, extension_name) extension(matrix, ascending) return matrix
M = TypeVar("M", bound=utt.Matrix) def _reduce_matrix( _name: str, matrix: M, per: str, reducer: Callable[[M], utt.NumpyVector], ) -> utt.NumpyVector: assert matrix.ndim == 2 axis = utt.PER_OF_AXIS.index(per) results_count = matrix.shape[1 - axis] _, dense, compressed = utt.to_proper_matrices(matrix, default_layout=utt.LAYOUT_OF_AXIS[axis]) elements_count: float if dense is not None: elements_count = dense.shape[axis] axis_flag = (dense.flags.c_contiguous, dense.flags.f_contiguous)[axis] if axis_flag: timed_step = ".dense-efficient" else: timed_step = ".dense-inefficient" else: assert compressed is not None elements_count = compressed.nnz / results_count axis_format = ("csr", "csc")[axis] if compressed.getformat() == axis_format: timed_step = ".compressed-efficient" else: timed_step = ".compressed-inefficient" with utm.timed_step(timed_step): utm.timed_parameters(results=results_count, elements=elements_count) return utt.to_numpy_vector(reducer(matrix))
[docs] @utm.timed_call() def bincount_vector( vector: utt.Vector, *, minlength: int = 0, ) -> utt.NumpyVector: """ Drop-in replacement for ``numpy.bincount``, which is timed and works for any ``vector`` data. """ dense = utt.to_numpy_vector(vector) result = np.bincount(dense, minlength=minlength) utm.timed_parameters(size=dense.size, bins=result.size) return result
[docs] @utm.timed_call() def most_frequent(vector: utt.Vector) -> Any: """ Return the most frequent value in a ``vector``. This is useful for :py:func:`metacells.tools.convey.convey_obs_to_group`. """ unique, positions = np.unique(utt.to_numpy_vector(vector), return_inverse=True) counts = np.bincount(positions) maxpos = np.argmax(counts) return unique[maxpos]
[docs] @utm.timed_call() def strongest(vector: utt.Vector) -> Any: """ Return the strongest (maximal absolute) value in a ``vector``. This is useful for :py:func:`metacells.tools.convey.convey_obs_to_group`. """ vector = utt.to_numpy_vector(vector) return vector[np.argmax(np.abs(vector))]
[docs] @utm.timed_call() def highest_weight(weights: utt.Vector, vector: utt.Vector) -> Any: """ Return the value with the highest total ``weights`` in a ``vector``. This is useful for :py:func:`metacells.tools.project.convey_atlas_to_query`. """ unique, positions = np.unique(utt.to_numpy_vector(vector), return_inverse=True) counts = np.bincount(positions, weights=weights) # type: ignore maxpos = np.argmax(counts) return unique[maxpos]
[docs] @utm.timed_call() def weighted_mean(weights: utt.Vector, vector: utt.Vector) -> Any: """ Return the weighted mean (using the ``weights`` and the values in the ``vector``). This is useful for :py:func:`metacells.tools.project.convey_atlas_to_query`. """ return np.average(vector, weights=weights) # type: ignore
[docs] @utm.timed_call() def fraction_of_grouped(value: Any) -> Callable[[utt.Vector], Any]: """ Return a function, that takes a vector and returns the fraction of elements of the vector which are equal to a specific ``value``. """ def compute(vector: utt.Vector) -> Any: return np.sum(utt.to_numpy_vector(vector) == value) / len(vector) return compute
ROLLING_FUNCTIONS = { "mean": pd.core.window.Rolling.mean, "median": pd.core.window.Rolling.median, "std": pd.core.window.Rolling.std, "var": pd.core.window.Rolling.var, } FULL_FUNCTIONS = { "mean": np.mean, "median": np.median, "std": np.std, "var": np.var, }
[docs] @utd.expand_doc(functions=", ".join([f"``{function}``" for function in ROLLING_FUNCTIONS])) @utm.timed_call() def sliding_window_function( vector: utt.Vector, *, function: str, window_size: int, order_by: Optional[utt.NumpyVector] = None, ) -> utt.NumpyVector: """ Return a vector of the same size as the input ``vector``, where each entry is the result of applying the ``function`` (one of {functions}) to a sliding window of size ``window_size`` centered on the entry. If ``order_by`` is specified, the ``vector`` is first sorted by this order, and the end result is unsorted back to the original order. That is, the sliding window centered at each position will contain the ``window_size`` of entries which have the nearest ``order_by`` values to the center entry. .. note:: The window size should be an odd positive integer. If an even value is specified, it is automatically increased by one. """ dense = utt.to_numpy_vector(vector) if window_size % 2 == 0: window_size += 1 half_window_size = (window_size - 1) // 2 utm.timed_parameters(function=function, size=dense.size, window=window_size) if window_size >= len(vector): value = FULL_FUNCTIONS[function](vector) # type: ignore return np.full(len(vector), value) if order_by is not None: assert dense.size == order_by.size order_indices = np.argsort(order_by) reverse_order_indices = np.argsort(order_indices) else: reverse_order_indices = order_indices = np.arange(dense.size) extended_order_indices = np.concatenate( [ # order_indices[np.arange(window_size - half_window_size, window_size)], order_indices, order_indices[np.arange(len(vector) - window_size, len(vector) - window_size + half_window_size)], ] ) extended_series = utt.to_pandas_series(dense[extended_order_indices]) rolling_windows = extended_series.rolling(window_size) # type: ignore compute = ROLLING_FUNCTIONS[function] computed = compute(rolling_windows).values reordered = computed[window_size - 1 :] assert reordered.size == dense.size if order_by is not None: reordered = reordered[reverse_order_indices] return reordered
[docs] @utm.timed_call() def patterns_matches( patterns: Union[str, Pattern, Collection[Union[str, Pattern]]], strings: Collection[str], invert: bool = False, ) -> utt.NumpyVector: """ Given a collection of (case-insensitive) ``strings``, return a boolean mask specifying which of them match the given regular expression ``patterns``. If ``invert`` (default: {invert}), invert the mask. """ if isinstance(patterns, (str, Pattern)): patterns = [patterns] utm.timed_parameters(patterns=len(patterns), strings=len(strings)) if len(patterns) == 0: mask = np.full(len(strings), False, dtype="bool") else: unified_pattern = ( "(" + ")|(".join( [alternative if isinstance(alternative, str) else alternative.pattern for alternative in patterns] ) + ")" ) pattern: Pattern = re.compile(unified_pattern, re.IGNORECASE) mask = np.array([bool(pattern.fullmatch(string)) for string in strings]) if invert: mask = ~mask return mask
[docs] @utm.timed_call() def compress_indices(indices: utt.Vector) -> utt.NumpyVector: """ Given a vector of group ``indices`` per element, return a vector where the group indices are consecutive. If the group indices contain ``-1`` ("outliers"), then it is preserved as ``-1`` in the result. """ indices = utt.to_numpy_vector(indices) unique, consecutive = np.unique(indices, return_inverse=True) consecutive += min(unique[0], 0) return consecutive.astype(indices.dtype)
[docs] @utm.timed_call() def bin_pack(element_sizes: utt.Vector, max_bin_size: float) -> utt.NumpyVector: """ Given a vector of ``element_sizes`` return a vector containing the bin number for each element, such that the total size of each bin is at most, and as close to as possible, to the ``max_bin_size``. This uses the first-fit decreasing algorithm for finding an initial solution and then moves elements around to minimize the l2 norm of the wasted space in each bin. """ size_of_bins: List[float] = [] element_sizes = utt.to_numpy_vector(element_sizes) descending_size_indices = np.argsort(element_sizes)[::-1] bin_of_elements = np.empty(element_sizes.size, dtype="int") for element_index in descending_size_indices: element_size = element_sizes[element_index] assigned = False for bin_index in range(len(size_of_bins)): # pylint: disable=consider-using-enumerate if size_of_bins[bin_index] + element_size <= max_bin_size: bin_of_elements[element_index] = bin_index size_of_bins[bin_index] += element_size assigned = True break if not assigned: bin_of_elements[element_index] = len(size_of_bins) size_of_bins.append(element_size) did_improve = True while did_improve: did_improve = False for element_index in descending_size_indices: element_size = element_sizes[element_index] if element_size > max_bin_size: continue current_bin_index = bin_of_elements[element_index] current_bin_space = max_bin_size - size_of_bins[current_bin_index] assert current_bin_space >= 0 remove_loss = (element_size + current_bin_space) ** 2 - current_bin_space**2 for bin_index in range(len(size_of_bins)): # pylint: disable=consider-using-enumerate if bin_index == current_bin_index: continue bin_space = max_bin_size - size_of_bins[bin_index] if bin_space < element_size: continue insert_gain = bin_space**2 - (bin_space - element_size) ** 2 if insert_gain > remove_loss: size_of_bins[current_bin_index] -= element_size current_bin_index = bin_of_elements[element_index] = bin_index size_of_bins[bin_index] += element_size remove_loss = insert_gain did_improve = True utm.timed_parameters(elements=element_sizes.size, bins=len(size_of_bins)) return bin_of_elements
[docs] @utm.timed_call() def bin_fill( # pylint: disable=too-many-statements,too-many-branches element_sizes: utt.Vector, min_bin_size: float ) -> utt.NumpyVector: """ Given a vector of ``element_sizes`` return a vector containing the bin number for each element, such that the total size of each bin is at least, and as close to as possible, to the ``min_bin_size``. This uses the first-fit decreasing algorithm for finding an initial solution and then moves elements around to minimize the l2 norm of the wasted space in each bin. """ element_sizes = utt.to_numpy_vector(element_sizes) total_size = np.sum(element_sizes) assert min_bin_size > 0 size_of_bins = [0.0] max_bins_count = int(total_size // min_bin_size) + 1 while min(size_of_bins) < min_bin_size: max_bins_count -= 1 if max_bins_count < 2: return np.zeros(element_sizes.size, dtype="int") size_of_bins = [] descending_size_indices = np.argsort(element_sizes)[::-1] bin_of_elements = np.empty(element_sizes.size, dtype="int") for element_index in descending_size_indices: element_size = element_sizes[element_index] assigned = False for bin_index in range(len(size_of_bins)): # pylint: disable=consider-using-enumerate if size_of_bins[bin_index] + element_size <= min_bin_size: bin_of_elements[element_index] = bin_index size_of_bins[bin_index] += element_size assigned = True break if assigned: continue if len(size_of_bins) < max_bins_count: bin_of_elements[element_index] = len(size_of_bins) size_of_bins.append(element_size) continue best_bin_index = -1 best_bin_waste = -1 for bin_index in range(len(size_of_bins)): # pylint: disable=consider-using-enumerate bin_waste = size_of_bins[bin_index] - min_bin_size + element_size assert bin_waste > 0 if best_bin_index < 0 or bin_waste < best_bin_waste: best_bin_index = bin_index best_bin_waste = bin_waste assert best_bin_index >= 0 assert best_bin_waste > 0 bin_of_elements[element_index] = best_bin_index size_of_bins[best_bin_index] += element_size did_improve = True while did_improve: did_improve = False for element_index in descending_size_indices: element_size = element_sizes[element_index] current_bin_index = bin_of_elements[element_index] current_bin_waste = size_of_bins[current_bin_index] - min_bin_size assert current_bin_waste >= 0 if current_bin_waste < element_size: continue remove_gain = current_bin_waste**2 - (current_bin_waste - element_size) ** 2 for bin_index in range(len(size_of_bins)): # pylint: disable=consider-using-enumerate if bin_index == current_bin_index: continue bin_waste = size_of_bins[bin_index] - min_bin_size insert_loss = (bin_waste + element_size) ** 2 - bin_waste**2 if insert_loss < remove_gain: size_of_bins[current_bin_index] -= element_size current_bin_index = bin_of_elements[element_index] = bin_index size_of_bins[bin_index] += element_size remove_gain = insert_loss did_improve = True utm.timed_parameters(elements=element_sizes.size, bins=len(size_of_bins)) return bin_of_elements
[docs] @utm.timed_call() def sum_groups( matrix: utt.ProperMatrix, groups: utt.Vector, *, per: Optional[str], transform: Optional[Callable[[utt.Matrix], utt.Matrix]] = None, ) -> Optional[Tuple[utt.NumpyMatrix, utt.NumpyVector]]: """ Given a ``matrix``, and a vector of ``groups`` ``per`` column or row, return a matrix with a column or row ``per`` group, containing the sum of the groups columns or rows, and a vector of sizes (the number of summed columns or rows) ``per`` group. Negative group indices ("outliers") are ignored and their data is not included in the result. If there are no non-negative group indices, returns ``None``. If ``per`` is ``None``, the matrix must be square and is assumed to be symmetric, so the most efficient direction is used based on the matrix layout. Otherwise it must be one of ``row`` or ``column``, and the matrix must be in the appropriate layout (``row_major`` operating on rows, ``column_major`` for operating on columns). If ``transform`` is not ``None``, it is applied to the data before summing it. """ per = _ensure_per_for("sum_groups", matrix, per) groups = utt.to_numpy_vector(groups) groups_count = np.max(groups) + 1 if groups_count == 0: return None efficient_layout = per + "_major" sparse = utt.maybe_sparse_matrix(matrix) if sparse is not None: if utt.is_layout(matrix, efficient_layout): timed_step = ".compressed-efficient" else: timed_step = ".compressed-inefficient" else: matrix = utt.to_numpy_matrix(matrix, only_extract=True) if utt.is_layout(matrix, efficient_layout): timed_step = ".dense-efficient" else: timed_step = ".dense-inefficient" if per == "column": matrix = matrix.transpose() with utm.timed_step(timed_step): utm.timed_parameters(groups=groups_count, entities=matrix.shape[0], elements=matrix.shape[1]) @utm.timed_call() def _sum_group(group_index: int) -> Tuple[utt.NumpyVector, int]: with utl.log_step( "- group", group_index, formatter=lambda group_index: utl.progress_description(groups_count, group_index, "group"), ): group_mask = groups == group_index utl.log_calc("group_mask", group_mask) group_matrix = matrix[group_mask, :] if transform is not None: group_matrix = transform(group_matrix) group_size = group_matrix.shape[0] assert group_size > 0 group_sum = utt.to_numpy_vector(group_matrix.sum(axis=0)) utl.log_calc("group_sum", group_sum, formatter=utl.sizes_description) return (group_sum, group_size) results = utp.parallel_map(_sum_group, groups_count) size_per_group = np.array([size_of_group for _, size_of_group in results]) sum_per_group = np.vstack([sum_of_group for sum_of_group, _ in results]) if per == "column": sum_per_group = sum_per_group.transpose() return sum_per_group, size_per_group
[docs] @utm.timed_call() def shuffle_matrix( matrix: utt.Matrix, *, per: str, random_seed: int, ) -> None: """ Shuffle (in-place) the ``matrix`` data ``per`` column or row. The matrix must be in the appropriate layout (``row_major`` for shuffling data in each row, ``column_major`` for shuffling data in each column). A non-zero ``random_seed`` (non-zero for reproducible results) will make the operation replicable. """ _ensure_layout_for("shuffle", matrix, per, allow_inefficient=False) axis = utt.PER_OF_AXIS.index(per) _, dense, compressed = utt.to_proper_matrices(matrix) if compressed is not None: extension_name = "shuffle_compressed_%s_t_%s_t_%s_t" % ( # pylint: disable=consider-using-f-string compressed.data.dtype, compressed.indices.dtype, compressed.indptr.dtype, ) extension = getattr(xt, extension_name) extension(compressed.data, compressed.indices, compressed.indptr, compressed.shape[1 - axis], random_seed) else: assert dense is not None if per == "column": dense = dense.transpose() extension_name = f"shuffle_dense_{dense.dtype}_t" extension = getattr(xt, extension_name) extension(dense, random_seed)
[docs] @utm.timed_call() def cover_diameter(*, points_count: int, area: float, cover_fraction: float) -> float: """ Return the diameter to give to each point so that the total area of ``points_count`` will be a ``cover_fraction`` of the total ``area``. """ return xt.cover_diameter(points_count, area, cover_fraction)
[docs] @utm.timed_call() @utd.expand_doc() def cover_coordinates( x_coordinates: utt.Vector, y_coordinates: utt.Vector, *, cover_fraction: float = 1 / 3, noise_fraction: float = 1.0, random_seed: int, ) -> Tuple[utt.NumpyVector, utt.NumpyVector]: """ Given x/y coordinates of points, move them so that the total area covered by them is ``cover_fraction`` (default: {cover_fraction}) of the total area of their bounding box, assuming each has the diameter of their minimal distance. The points are jiggled around by the ``noise_fraction`` of their minimal distance using the ``random_seed`` (non-zero for reproducible results). Returns new x/y coordinates vectors. """ x_coordinates = utt.to_numpy_vector(x_coordinates) y_coordinates = utt.to_numpy_vector(y_coordinates) points_count = len(x_coordinates) assert x_coordinates.dtype == y_coordinates.dtype x_coordinates = utt.to_numpy_vector(x_coordinates, copy=True) y_coordinates = utt.to_numpy_vector(y_coordinates, copy=True) spaced_x_coordinates = np.full(points_count, -0.1, dtype=x_coordinates.dtype) spaced_y_coordinates = np.full(points_count, -0.2, dtype=y_coordinates.dtype) extension_name = f"cover_coordinates_{x_coordinates.dtype}_t" extension = getattr(xt, extension_name) extension( x_coordinates, y_coordinates, spaced_x_coordinates, spaced_y_coordinates, cover_fraction, noise_fraction, random_seed, ) return spaced_x_coordinates, spaced_y_coordinates
[docs] @utm.timed_call() def random_piles( elements_count: int, target_pile_size: int, *, random_seed: int, ) -> utt.NumpyVector: """ Split ``elements_count`` elements into piles of a size roughly equal to ``target_pile_size``. Return a vector specifying the pile index of each element. Specify a non-zero ``random_seed`` to make this replicable. """ assert elements_count > 0 assert target_pile_size > 0 piles_count = elements_count / target_pile_size few_piles_count = max(floor(piles_count), 1) many_piles_count = ceil(piles_count) if few_piles_count == many_piles_count: piles_count = few_piles_count else: few_piles_size = elements_count / few_piles_count many_piles_size = elements_count / many_piles_count few_piles_factor = few_piles_size / target_pile_size many_piles_factor = target_pile_size / many_piles_size assert few_piles_factor >= 1 assert many_piles_factor >= 1 if few_piles_factor < many_piles_factor: piles_count = few_piles_count else: piles_count = many_piles_count pile_of_elements_list: List[utt.NumpyVector] = [] minimal_pile_size = floor(elements_count / piles_count) extra_elements = elements_count - minimal_pile_size * piles_count assert 0 <= extra_elements < piles_count if extra_elements > 0: pile_of_elements_list.append(np.arange(extra_elements)) for pile_index in range(piles_count): pile_of_elements_list.append(np.full(minimal_pile_size, pile_index)) pile_of_elements = np.concatenate(pile_of_elements_list) assert pile_of_elements.size == elements_count np.random.seed(random_seed) return np.random.permutation(pile_of_elements)
[docs] @utm.timed_call() def represent( goal: utt.NumpyVector, basis: utt.NumpyMatrix, ) -> Optional[Tuple[float, utt.NumpyVector]]: """ Represent a ``goal`` vector as a weighted average of the row vectors of some ``basis`` matrix. This computes a non-negative weight for each matrix row, such that the sum of weights is 1, minimizing the distance (L2 norm) between the goal vector and the weighted average of the basis vectors. This is a convex problem quadratic subject to a linear constraint, so ``cvxpy`` solves it efficiently. The return value is a tuple with the score of the weights vector, and the weights vector itself. """ rows_count, columns_count = basis.shape assert columns_count == len(goal) variables = cvxpy.Variable(rows_count, nonneg=True) constraints = [cvxpy.sum(variables) == 1] objective = cvxpy.norm(goal - variables @ basis, 2) problem = cvxpy.Problem(cvxpy.Minimize(objective), constraints) try: result = problem.solve(solver="SCS") except cvxpy.error.SolverError: try: result = problem.solve(solver="ECOS") except cvxpy.error.SolverError: result = problem.solve(solver="QSQP") if result is None: return None return (float(result), utt.to_numpy_vector(variables.value))
[docs] @utm.timed_call() def min_cut( # pylint: disable=too-many-branches,too-many-statements weights: utt.Matrix, ) -> Tuple[ig.Cut, Optional[float]]: """ Find the minimal cut that will split an undirected graph (with a symmetrical ``weights`` matrix). Returns the ``igraph.Cut`` object describing the cut, and the scale-invariant strength of the cut edges. This strength is the ratio between the mean weight of an edge connecting a random node in each partition and the mean weight of an edge connecting two random nodes inside a random partition. If either of the partitions contains no edges (e.g. contains a single node), the strength will be ``None``. """ assert weights.shape[0] == weights.shape[1] nodes_count = weights.shape[0] assert nodes_count > 1 edges: List[Tuple[int, int]] = [] weight_of_edges: List[float] = [] _, dense, compressed = utt.to_proper_matrices(weights) if dense is not None: for source in range(nodes_count): for target in range(source): weight = dense[source, target] if weight > 0: edges.append((source, target)) weight_of_edges.append(weight) else: assert compressed is not None for source in range(nodes_count): offsets = range(compressed.indptr[source], compressed.indptr[source + 1]) for target, weight in zip(compressed.indices[offsets], compressed.data[offsets]): if target >= source: break edges.append((source, target)) weight_of_edges.append(weight) graph = ig.Graph(n=nodes_count, edges=edges) cut = graph.mincut(capacity=weight_of_edges) is_second = np.zeros(nodes_count, dtype="bool") is_second[cut.partition[1]] = True first_size = len(cut.partition[0]) second_size = len(cut.partition[1]) assert first_size + second_size == nodes_count cut_total_weight = 0.0 first_total_weight = 0.0 second_total_weight = 0.0 for (source, target), weight in zip(edges, weight_of_edges): if is_second[source]: if is_second[target]: second_total_weight += weight else: cut_total_weight += weight else: if is_second[target]: cut_total_weight += weight else: first_total_weight += weight if cut_total_weight == 0: return (cut, 0.0) if first_total_weight > 0 and second_total_weight > 0: cut_edges_count = first_size * second_size first_edges_count = (first_size * (first_size - 1)) / 2 second_edges_count = (second_size * (second_size - 1)) / 2 total_edges_count = (nodes_count * (nodes_count - 1)) / 2 assert cut_edges_count + first_edges_count + second_edges_count == total_edges_count cut_mean_weight = cut_total_weight / cut_edges_count first_mean_weight = first_total_weight / first_edges_count second_mean_weight = second_total_weight / second_edges_count inner_mean_weight = sqrt(first_mean_weight * second_mean_weight) cut_strength: Optional[float] = cut_mean_weight / inner_mean_weight else: cut_strength = None return (cut, cut_strength)
[docs] @utm.timed_call() def sparsify_matrix( full: utt.ProperMatrix, min_column_max_value: float, min_entry_value: float, abs_values: bool, ) -> utt.CompressedMatrix: """ Given a full matrix, return a sparse matrix such that all non-zero entries are at least ``min_entry_value``, and columns that have no value above ``min_column_max_value`` are set to all-zero. If ``abs_values`` consider the absolute values when comparing to the thresholds. """ assert 0 <= min_entry_value <= min_column_max_value if abs_values: comparable = np.abs(full) # type: ignore else: comparable = full # type: ignore max_per_column = max_per(comparable, per="column") low_columns = max_per_column < min_column_max_value sparse_comparable = utt.maybe_compressed_matrix(comparable) if sparse_comparable is None: low_entries = comparable < min_entry_value else: low_entries = ~utt.to_numpy_matrix(comparable >= min_entry_value) lil = sp.lil_matrix(full) lil[:, low_columns] = 0 lil[low_entries] = 0 return sp.csr_matrix(lil)