Source code for metacells.tools.dissolve
"""
Dissolve
--------
"""
from math import floor
from typing import Optional
from typing import Union
import numpy as np
from anndata import AnnData # type: ignore
import metacells.parameters as pr
import metacells.utilities as ut
__all__ = [
"dissolve_metacells",
]
[docs]
@ut.logged()
@ut.expand_doc()
@ut.timed_call()
def dissolve_metacells(
adata: AnnData,
what: Union[str, ut.Matrix] = "__x__",
*,
candidates: Union[str, ut.Vector] = "candidate",
deviants: ut.Vector,
target_metacell_size: int = pr.target_metacell_size,
min_metacell_size: int = pr.min_metacell_size,
target_metacell_umis: int = pr.target_metacell_umis,
cell_umis: Optional[ut.NumpyVector] = pr.cell_umis,
min_robust_size_factor: float = pr.dissolve_min_robust_size_factor,
min_convincing_gene_fold_factor: Optional[float] = pr.dissolve_min_convincing_gene_fold_factor,
) -> None:
"""
Dissolve too-small metacells based on ``what`` (default: {what}) data.
**Input**
Annotated ``adata``, where the observations are cells and the variables are genes, where
``what`` is a per-variable-per-observation matrix or the name of a per-variable-per-observation
annotation containing such a matrix.
**Returns**
Sets the following in ``adata``:
Observation (Cell) Annotations
``metacell``
The integer index of the metacell each cell belongs to. The metacells are in no
particular order. Cells with no metacell assignment are given a metacell index of
``-1``.
``dissolved``
A boolean mask of the cells which were in a dissolved metacell.
**Computation Parameters**
0. If ``cell_umis`` is not specified, use the sum of the ``what`` data for each cell.
1. Mark all ``deviants`` cells "outliers". This can be the name of a per-observation (cell) annotation, or an
explicit boolean mask of cells, or a or ``None`` if there are no deviant cells to mark.
2. Any metacell which has less cells than the ``min_metacell_size`` is dissolved into outlier cells.
3. If ``min_convincing_gene_fold_factor`` is not ``None``, preserve everything else. Otherwise:
4. We are trying to create metacells of size ``target_metacell_size`` cells and ``target_metacell_umis`` UMIs each.
Compute the UMIs of the metacells by summing the ``cell_umis``.
5. Using ``min_robust_size_factor`` (default: {min_robust_size_factor}), any
metacell whose total size is at least ``target_metacell_size * min_robust_size_factor``
or whose total UMIs are at least ``target_metacell_umis * min_robust_size_factor`` is preserved.
6. Using ``min_convincing_gene_fold_factor``, preserve any remaining metacells which have at least one gene whose
fold factor (log2((actual + 1) / (expected_by_overall_population + 1))) is at least this high.
6. Dissolve the remaining metacells into outlier cells.
"""
dissolved_of_cells = np.zeros(adata.n_obs, dtype="bool")
candidate_of_cells = ut.get_o_numpy(adata, candidates, formatter=ut.groups_description)
candidate_of_cells = np.copy(candidate_of_cells)
deviant_of_cells = ut.maybe_o_numpy(adata, deviants, formatter=ut.mask_description)
if deviant_of_cells is not None:
deviant_of_cells = deviant_of_cells > 0
if cell_umis is None:
cell_umis = ut.get_o_numpy(adata, what, sum=True, formatter=ut.sizes_description).astype("float32")
else:
assert cell_umis.dtype == "float32"
assert isinstance(cell_umis, ut.NumpyVector)
if deviant_of_cells is not None:
candidate_of_cells[deviant_of_cells > 0] = -1
candidate_of_cells = ut.compress_indices(candidate_of_cells)
candidates_count = np.max(candidate_of_cells) + 1
data = ut.get_vo_proper(adata, what, layout="column_major")
fraction_of_genes = ut.fraction_per(data, per="column")
min_robust_size = int(floor(target_metacell_size * min_robust_size_factor))
min_robust_umis = int(floor(target_metacell_umis * min_robust_size_factor))
ut.log_calc("min_robust_size", min_robust_size)
ut.log_calc("min_robust_umis", min_robust_umis)
did_dissolve = False
for candidate_index in range(candidates_count):
candidate_cell_indices = np.where(candidate_of_cells == candidate_index)[0]
if not _keep_candidate(
candidate_index,
data=data,
cell_umis=cell_umis,
fraction_of_genes=fraction_of_genes,
min_metacell_size=min_metacell_size,
min_robust_size=min_robust_size,
min_robust_umis=min_robust_umis,
min_convincing_gene_fold_factor=min_convincing_gene_fold_factor,
candidates_count=candidates_count,
candidate_cell_indices=candidate_cell_indices,
):
dissolved_of_cells[candidate_cell_indices] = True
candidate_of_cells[candidate_cell_indices] = -1
did_dissolve = True
if did_dissolve:
metacell_of_cells = ut.compress_indices(candidate_of_cells)
else:
metacell_of_cells = candidate_of_cells
ut.set_o_data(adata, "dissolved", dissolved_of_cells)
ut.set_o_data(adata, "metacell", metacell_of_cells, formatter=ut.groups_description)
def _keep_candidate(
candidate_index: int,
*,
data: ut.ProperMatrix,
cell_umis: ut.NumpyVector,
fraction_of_genes: ut.NumpyVector,
min_metacell_size: int,
min_robust_size: int,
min_robust_umis: int,
min_convincing_gene_fold_factor: Optional[float],
candidates_count: int,
candidate_cell_indices: ut.NumpyVector,
) -> bool:
metacell_size = candidate_cell_indices.size
metacell_umis = np.sum(cell_umis[candidate_cell_indices])
if metacell_size < min_metacell_size:
if ut.logging_calc():
ut.log_calc(
f'- candidate: {ut.progress_description(candidates_count, candidate_index, "candidate")} '
f"size: {metacell_size} "
f"umis: {metacell_umis:g} "
f"has: less than minimal size"
)
return False
if metacell_size >= min_robust_size:
if ut.logging_calc():
ut.log_calc(
f'- candidate: {ut.progress_description(candidates_count, candidate_index, "candidate")} '
f"size: {metacell_size} "
f"umis: {metacell_umis:g} "
f"has: robust size"
)
return True
if metacell_umis >= min_robust_umis:
if ut.logging_calc():
ut.log_calc(
f'- candidate: {ut.progress_description(candidates_count, candidate_index, "candidate")} '
f"size: {metacell_size} "
f"umis: {metacell_umis:g} "
f"has: robust umis"
)
return True
if min_convincing_gene_fold_factor is None:
if ut.logging_calc():
ut.log_calc(
f'- candidate: {ut.progress_description(candidates_count, candidate_index, "candidate")} '
f"size: {metacell_size} "
f"umis: {metacell_umis:g} "
f"has: minimal size"
)
return True
genes_count = data.shape[1]
candidate_data = data[candidate_cell_indices, :]
candidate_data_of_genes = ut.to_numpy_vector(candidate_data.sum(axis=0))
assert candidate_data_of_genes.size == genes_count
candidate_total = np.sum(candidate_data_of_genes)
candidate_expected_of_genes = fraction_of_genes * candidate_total
candidate_expected_of_genes += 1
candidate_data_of_genes += 1
candidate_data_of_genes /= candidate_expected_of_genes
np.log2(candidate_data_of_genes, out=candidate_data_of_genes)
convincing_genes_mask = np.abs(candidate_data_of_genes) >= min_convincing_gene_fold_factor
keep_candidate = bool(np.any(convincing_genes_mask))
if ut.logging_calc():
if keep_candidate:
ut.log_calc(
f'- candidate: {ut.progress_description(candidates_count, candidate_index, "candidate")} '
f"size: {metacell_size} "
f"umis: {metacell_umis:g} "
f"has: convincing genes"
)
else:
ut.log_calc(
f'- candidate: {ut.progress_description(candidates_count, candidate_index, "candidate")} '
f"size: {metacell_size} "
f"umis: {metacell_umis:g} "
f"has: no convincing genes"
)
return keep_candidate