"""
K-Nearest-Neighbors Graph
-------------------------
"""
from typing import Optional
from typing import Union
import numpy as np
import scipy.sparse as sp # type: ignore
from anndata import AnnData # type: ignore
import metacells.parameters as pr
import metacells.utilities as ut
__all__ = [
"compute_obs_obs_knn_graph",
"compute_var_var_knn_graph",
]
[docs]
@ut.logged()
@ut.timed_call()
@ut.expand_doc()
def compute_obs_obs_knn_graph(
adata: AnnData,
what: Union[str, ut.Matrix] = "obs_similarity",
*,
k: int,
balanced_ranks_factor: float = pr.knn_balanced_ranks_factor,
incoming_degree_factor: float = pr.knn_incoming_degree_factor,
outgoing_degree_factor: float = pr.knn_outgoing_degree_factor,
min_outgoing_degree: int = pr.knn_min_outgoing_degree,
inplace: bool = True,
) -> Optional[ut.PandasFrame]:
"""
Compute a directed K-Nearest-Neighbors graph based on ``what`` (default: what) similarity data
for each pair of observations (cells).
**Input**
Annotated ``adata``, where the observations are cells and the variables are genes, where
``what`` is a per-observation-per-observation matrix or the name of a
per-observation-per-observation annotation containing such a matrix.
**Returns**
Observations-Pair Annotations
``obs_outgoing_weights``
A sparse square matrix where each non-zero entry is the weight of an edge between a pair
of cells or genes, where the sum of the weights of the outgoing edges for each element
is 1 (there is always at least one such edge).
If ``inplace`` (default: {inplace}), this is written to the data, and the function returns
``None``. Otherwise this is returned as a pandas data frame (indexed by the observation names).
**Computation Parameters**
1. Use the ``obs_similarity`` and convert it to ranks (in descending order).
This gives us a dense asymmetric ``<elements>_outgoing_ranks`` matrix.
2. Convert the asymmetric outgoing ranks matrix into a symmetric ``obs_balanced_ranks`` matrix
by element-wise multiplying it with its transpose and taking the square root. That is, for
each edge to be high-balanced-rank, the geomean of its outgoing rank has to be high in both
nodes it connects.
.. note::
This can drastically reduce the degree of the nodes, since to survive an edge needs to
have been in the top ranks for both its nodes (as multiplying with zero drops the edge).
This is why the ``balanced_ranks_factor`` needs to be large-ish.
3. Keeping only balanced ranks of geomean of up to ``k * balanced_ranks_factor`` (default:
{balanced_ranks_factor}). This does a preliminary pruning of low-quality edges.
4. Prune the edges, keeping only the ``k * incoming_degree_factor`` (default: k * {incoming_degree_factor})
highest-ranked incoming edges for each node, and then only the ``k * outgoing_degree_factor`` (default:
{outgoing_degree_factor}) highest-ranked outgoing edges for each node, while ensuring that the
highest-balanced-ranked outgoing edge of each node is preserved. This gives us an asymmetric ``obs_pruned_ranks``
matrix, which has the structure we want, but not the correct edge weights yet.
.. note::
Balancing the ranks, and then pruning the incoming edges, ensures that "hub" nodes, that
is nodes that many other nodes prefer to connect with, end up connected to a limited
number of such "spoke" nodes.
5. If there is any node which is left with an out degree of less than ``min_outgoing_degree`` (default:
{min_outgoing_degree}), increase K by 10% and repeat steps 2-4.
6. Normalize the outgoing edge weights by dividing them with the sum of their balanced ranks,
such that the sum of the outgoing edge weights for each node is 1. Note that there is always
at least one outgoing edge for each node. This gives us the ``obs_outgoing_weights`` for our
directed K-Nearest-Neighbors graph.
.. note::
Ensuring each node has at least one outgoing edge allows us to always have at least one
candidate grouping to add it to. This of course doesn't protect the node from being
rejected by its group as deviant.
"""
return _compute_elements_knn_graph(
adata,
"obs",
what,
k=k,
balanced_ranks_factor=balanced_ranks_factor,
incoming_degree_factor=incoming_degree_factor,
outgoing_degree_factor=outgoing_degree_factor,
min_outgoing_degree=min_outgoing_degree,
inplace=inplace,
)
[docs]
@ut.logged()
@ut.timed_call()
@ut.expand_doc()
def compute_var_var_knn_graph(
adata: AnnData,
what: Union[str, ut.Matrix] = "var_similarity",
*,
k: int,
balanced_ranks_factor: float = pr.knn_balanced_ranks_factor,
incoming_degree_factor: float = pr.knn_incoming_degree_factor,
outgoing_degree_factor: float = pr.knn_outgoing_degree_factor,
min_outgoing_degree: int = pr.knn_min_outgoing_degree,
inplace: bool = True,
) -> Optional[ut.PandasFrame]:
"""
Compute a directed K-Nearest-Neighbors graph based on ``what`` (default: what) similarity data
for each pair of variables (genes).
**Input**
Annotated ``adata``, where the observations are cells and the variables are genes, where
``what`` is a per-variable-per-variable matrix or the name of a per-variable-per-variable
annotation containing such a matrix.
**Returns**
Variables-Pair Annotations
``var_outgoing_weights``
A sparse square matrix where each non-zero entry is the weight of an edge between a pair
of cells or genes, where the sum of the weights of the outgoing edges for each element
is 1 (there is always at least one such edge).
If ``inplace`` (default: {inplace}), this is written to the data, and the function returns
``None``. Otherwise this is returned as a pandas data frame (indexed by the variable names).
**Computation Parameters**
1. Use the ``var_similarity`` and convert it to ranks (in descending order).
This gives us a dense asymmetric ``<elements>_outgoing_ranks`` matrix.
2. Convert the asymmetric outgoing ranks matrix into a symmetric ``var_balanced_ranks`` matrix
by element-wise multiplying it with its transpose and taking the square root. That is, for
each edge to be high-balanced-rank, the geomean of its outgoing rank has to be high in both
nodes it connects.
3. Keeping only balanced ranks of up to ``k * k * balanced_ranks_factor`` (default:
{balanced_ranks_factor}). This does a preliminary pruning of low-quality edges.
4. Prune the edges, keeping only the ``k * incoming_degree_factor`` (default: k * {incoming_degree_factor})
highest-ranked incoming edges for each node, and then only the ``k * outgoing_degree_factor`` (default:
{outgoing_degree_factor}) highest-ranked outgoing edges for each node, while ensuring that the
highest-balanced-ranked outgoing edge of each node is preserved. This gives us an asymmetric ``var_pruned_ranks``
matrix, which has the structure we want, but not the correct edge weights yet.
.. note::
Balancing the ranks, and then pruning the incoming edges, ensures that "hub" nodes, that
is nodes that many other nodes prefer to connect with, end up connected to a limited
number of such "spoke" nodes.
5. If there is any node which is left with an out degree of less than ``min_outgoing_degree`` (default:
{min_outgoing_degree}), increase K by 10% and repeat steps 2-4.
6. Normalize the outgoing edge weights by dividing them with the sum of their balanced ranks,
such that the sum of the outgoing edge weights for each node is 1. Note that there is always
at least one outgoing edge for each node. This gives us the ``var_outgoing_weights`` for our
directed K-Nearest-Neighbors graph.
.. note::
Ensuring each node has at least one outgoing edge allows us to always have at least one
candidate grouping to add it to. This of course doesn't protect the node from being
rejected by its group as deviant.
"""
return _compute_elements_knn_graph(
adata,
"var",
what,
k=k,
balanced_ranks_factor=balanced_ranks_factor,
incoming_degree_factor=incoming_degree_factor,
outgoing_degree_factor=outgoing_degree_factor,
min_outgoing_degree=min_outgoing_degree,
inplace=inplace,
)
def _compute_elements_knn_graph(
adata: AnnData,
elements: str,
what: Union[str, ut.Matrix] = "__x__",
*,
k: int,
balanced_ranks_factor: float,
incoming_degree_factor: float,
outgoing_degree_factor: float,
min_outgoing_degree: int,
inplace: bool = True,
) -> Optional[ut.PandasFrame]:
assert elements in ("obs", "var")
assert balanced_ranks_factor > 0.0
assert incoming_degree_factor > 0.0
assert outgoing_degree_factor > 0.0
assert min_outgoing_degree >= 1
if elements == "obs":
get_data = ut.get_oo_proper
set_data = ut.set_oo_data
else:
get_data = ut.get_vv_proper
set_data = ut.set_vv_data
def store_matrix(matrix: ut.CompressedMatrix, name: str, when: bool) -> None: #
if when:
name = elements + "_" + name
set_data(
adata,
name,
matrix,
formatter=lambda matrix: ut.ratio_description(
matrix.shape[0] * matrix.shape[1], "element", matrix.nnz, "nonzero"
),
)
elif ut.logging_calc():
ut.log_calc(
f"{elements}_{name}",
ut.ratio_description(matrix.shape[0] * matrix.shape[1], "element", matrix.nnz, "nonzero"),
)
similarity = ut.to_proper_matrix(get_data(adata, what))
similarity = ut.to_layout(similarity, "row_major", symmetric=True)
similarity = ut.to_numpy_matrix(similarity)
ut.log_calc("similarity", similarity)
while True:
outgoing_ranks = _rank_outgoing(similarity)
balanced_ranks = _balance_ranks(outgoing_ranks, k, balanced_ranks_factor)
store_matrix(balanced_ranks, "balanced_ranks", True)
pruned_ranks = _prune_ranks(balanced_ranks, k, incoming_degree_factor, outgoing_degree_factor)
store_matrix(pruned_ranks, "pruned_ranks", True)
outgoing_ranks = ut.nnz_per(pruned_ranks, per="row")
outgoing_degree = np.min(outgoing_ranks)
ut.log_calc("outgoing_degree", outgoing_degree)
if outgoing_degree >= min_outgoing_degree:
break
k += max(1, int(k / 10))
ut.log_calc("k", k)
outgoing_weights = _weigh_edges(pruned_ranks)
store_matrix(outgoing_weights, "outgoing_weights", inplace)
if inplace:
return None
if elements == "obs":
names = adata.obs_names
else:
names = adata.var_names
return ut.to_pandas_frame(outgoing_weights, index=names, columns=names)
@ut.timed_call()
def _rank_outgoing(similarity: ut.NumpyMatrix) -> ut.NumpyMatrix:
size = similarity.shape[0]
assert similarity.shape == (size, size)
similarity = np.copy(similarity)
min_similarity = ut.min_matrix(similarity)
np.fill_diagonal(similarity, min_similarity - 1)
assert ut.is_layout(similarity, "row_major")
outgoing_ranks = ut.rank_matrix_by_layout(similarity, ascending=False)
assert np.sum(np.diagonal(outgoing_ranks) == size) == size
return outgoing_ranks
@ut.timed_call()
def _balance_ranks(outgoing_ranks: ut.NumpyMatrix, k: int, balanced_ranks_factor: float) -> ut.CompressedMatrix:
size = outgoing_ranks.shape[0]
with ut.timed_step(".multiply"):
ut.timed_parameters(size=size)
dense_balanced_ranks = outgoing_ranks
assert np.sum(np.diagonal(dense_balanced_ranks) == size) == size
dense_balanced_ranks *= outgoing_ranks.transpose()
with ut.timed_step(".sqrt"):
np.sqrt(dense_balanced_ranks, out=dense_balanced_ranks)
max_rank = k * balanced_ranks_factor
ut.log_calc("max_rank", max_rank)
dense_balanced_ranks *= -1
dense_balanced_ranks += 2**21
with ut.timed_step("numpy.argmax"):
ut.timed_parameters(size=size)
max_index_of_each = ut.to_numpy_vector(dense_balanced_ranks.argmax(axis=1)) #
dense_balanced_ranks += max_rank + 1 - 2**21
preserved_row_indices = np.arange(size)
preserved_column_indices = max_index_of_each
preserved_balanced_ranks = ut.to_numpy_vector(dense_balanced_ranks[preserved_row_indices, preserved_column_indices])
preserved_balanced_ranks[preserved_balanced_ranks < 1] = 1
dense_balanced_ranks[dense_balanced_ranks < 0] = 0
np.fill_diagonal(dense_balanced_ranks, 0)
dense_balanced_ranks[preserved_row_indices, preserved_column_indices] = preserved_balanced_ranks
assert np.sum(np.diagonal(dense_balanced_ranks) == 0) == size
sparse_balanced_ranks = sp.csr_matrix(dense_balanced_ranks)
_assert_proper_compressed(sparse_balanced_ranks, "csr")
return sparse_balanced_ranks
@ut.timed_call()
def _prune_ranks(
balanced_ranks: ut.CompressedMatrix, k: int, incoming_degree_factor: float, outgoing_degree_factor: float
) -> ut.CompressedMatrix:
size = balanced_ranks.shape[0]
incoming_degree = int(round(k * incoming_degree_factor))
incoming_degree = min(incoming_degree, size - 1)
ut.log_calc("incoming_degree", incoming_degree)
outgoing_degree = int(round(k * outgoing_degree_factor))
outgoing_degree = min(outgoing_degree, size - 1)
ut.log_calc("outgoing_degree", outgoing_degree)
all_indices = np.arange(size)
with ut.timed_step("numpy.argmax"):
ut.timed_parameters(results=size, elements=balanced_ranks.nnz / size)
max_index_of_each = ut.to_numpy_vector(balanced_ranks.argmax(axis=1))
preserved_row_indices = all_indices
preserved_column_indices = max_index_of_each
preserved_balanced_ranks = ut.to_numpy_vector(balanced_ranks[preserved_row_indices, preserved_column_indices])
assert np.min(preserved_balanced_ranks) > 0
preserved_matrix = sp.coo_matrix(
(preserved_balanced_ranks, (preserved_row_indices, preserved_column_indices)), shape=balanced_ranks.shape
)
preserved_matrix.has_canonical_format = True
pruned_ranks = ut.mustbe_compressed_matrix(ut.to_layout(balanced_ranks, "column_major", symmetric=True))
_assert_proper_compressed(pruned_ranks, "csc")
pruned_ranks = ut.prune_per(pruned_ranks, incoming_degree)
_assert_proper_compressed(pruned_ranks, "csc")
pruned_ranks = ut.mustbe_compressed_matrix(ut.to_layout(pruned_ranks, "row_major"))
_assert_proper_compressed(pruned_ranks, "csr")
pruned_ranks = ut.prune_per(pruned_ranks, outgoing_degree)
_assert_proper_compressed(pruned_ranks, "csr")
with ut.timed_step("sparse.maximum"):
ut.timed_parameters(collected=pruned_ranks.nnz, preserved=preserved_matrix.nnz)
pruned_ranks = pruned_ranks.maximum(preserved_matrix)
pruned_ranks = pruned_ranks.maximum(preserved_matrix.transpose())
ut.sort_compressed_indices(pruned_ranks)
pruned_ranks = ut.mustbe_compressed_matrix(pruned_ranks)
_assert_proper_compressed(pruned_ranks, "csr")
return pruned_ranks
@ut.timed_call()
def _weigh_edges(pruned_ranks: ut.CompressedMatrix) -> ut.CompressedMatrix:
size = pruned_ranks.shape[0]
total_ranks_per_row = ut.sum_per(pruned_ranks, per="row")
ut.timed_parameters(size=size)
scale_per_row = np.reciprocal(total_ranks_per_row, out=total_ranks_per_row)
edge_weights = pruned_ranks.multiply(scale_per_row[:, None])
edge_weights = ut.to_layout(edge_weights, "row_major")
_assert_proper_compressed(edge_weights, "csr")
return edge_weights
def _assert_proper_compressed(matrix: ut.CompressedMatrix, layout: str) -> None:
assert sp.issparse(matrix)
assert ut.shaped_dtype(matrix) == "float32"
assert matrix.getformat() == layout
assert matrix.has_sorted_indices
assert matrix.has_canonical_format