Source code for metacells.utilities.timing

"""
Timing
------

The first step in achieving reasonable performance is identifying where most of the time is being
spent. The functions in this module allow to easily collect timing information about the relevant
functions or steps within functions in a controlled way, with low overhead, as opposed to collecting
information about all functions which has higher overheads and produces mountains of mostly
irrelevant data.
"""

import os
from contextlib import contextmanager
from functools import wraps
from threading import current_thread
from threading import local as thread_local
from time import perf_counter_ns
from time import process_time_ns
from typing import IO
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterator
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import TypeVar

import metacells.utilities.documentation as utd
import metacells.utilities.logging as utl

__all__ = [
    "collect_timing",
    "flush_timing",
    "in_parallel_map",
    "log_steps",
    "timed_step",
    "timed_call",
    "timed_parameters",
    "context",
    "current_step",
    "StepTiming",
    "Counters",
]

COLLECT_TIMING = False

TIMING_PATH = "timing.csv"
TIMING_MODE = "a"
TIMING_BUFFERING = 1
TIMING_FILE: Optional[IO] = None

LOG_ALL_STEPS = False

THREAD_LOCAL = thread_local()
COUNTED_THREADS = 0


[docs] @utd.expand_doc() def collect_timing( collect: bool, path: str = TIMING_PATH, # pylint: disable=used-prior-global-declaration mode: str = TIMING_MODE, # pylint: disable=used-prior-global-declaration *, buffering: int = TIMING_BUFFERING, # pylint: disable=used-prior-global-declaration ) -> None: """ Specify whether, where and how to collect timing information. By default, we do not. Override this by setting the ``METACELLS_COLLECT_TIMING`` environment variable to ``true``, or by invoking this function from the main thread. By default, the data is written to the ``path`` is {path}, which is opened with the mode is {mode} and using the buffering is {buffering}. Override this by setting the ``METACELL_TIMING_PATH``, ``METACELL_TIMING_MODE`` and/or the ``METACELL_TIMING_BUFFERING`` environment variables, or by invoking this function from the main thread. This will flush and close the previous timing file, if any. The file is written in CSV format (without headers). The first three fields are: * The invocation context (a ``.``-separated path of "relevant" function/step names). * The elapsed time (in nanoseconds) in this context (not counting nested contexts). * The CPU time (in nanoseconds) in this context (not counting nested contexts). This may be followed by a series of ``name,value`` pairs describing parameters of interest for this context, such as data sizes and layouts, to help understand the performance of the code. """ global TIMING_PATH global TIMING_MODE global TIMING_BUFFERING global TIMING_FILE global COLLECT_TIMING if not path.endswith(".csv"): raise ValueError(f"The METACELL_TIMING_PATH: {path} does not end with: .csv") TIMING_PATH = path TIMING_MODE = mode TIMING_BUFFERING = buffering if TIMING_FILE is not None: TIMING_FILE.flush() TIMING_FILE.close() TIMING_FILE = None if collect: TIMING_FILE = open( # pylint: disable=consider-using-with TIMING_PATH, TIMING_MODE, buffering=TIMING_BUFFERING, encoding="utf8" ) COLLECT_TIMING = collect
[docs] def flush_timing() -> None: """ Flush the timing information, if we are collecting it. """ if TIMING_FILE is not None: TIMING_FILE.flush()
[docs] def in_parallel_map(map_index: int, process_index: int) -> None: """ Reconfigure timing collection when running in a parallel sub-process via :py:func:`metacells.utilities.parallel.parallel_map`. This will direct the timing information from ``<timing>.csv`` to ``<timing>.<map>.<process>.csv`` (where ``<timing>`` is from the original path, ``<map>`` is the serial number of the :py:func:`metacells.utilities.parallel.parallel_map` invocation, and ``<process>`` is the serial number of the process in the map). Collecting the timing of separate sub-processes to separate files allows us to freely write to them without locks and synchronizations which improves the performance (reduces the overhead of collecting timing information). You can just concatenate the files when the run is complete, or use a tool which automatically collects the data from all the files, such as :py:mod:`metacells.scripts.timing`. """ if COLLECT_TIMING: assert TIMING_PATH.endswith(".csv") collect_timing(True, f"{TIMING_PATH[:-4]}.{map_index}.{process_index}.csv")
[docs] def log_steps(log: bool) -> None: """ Whether to log every step invocation. By default, we do not. Override this by setting the ``METACELLS_LOG_ALL_STEPS`` environment variable to ``true`` or by invoking this function from the main thread. .. note:: This only works if :py:func:`collect_timing` was set. It is a crude instrument to hunt for deadlocks, very-long-running numpy functions, and the like. Basically, if the program is taking 100% CPU and you have no idea what it is doing, turning this on and looking at the last logged step name would give you some idea of where it is stuck. """ global LOG_ALL_STEPS LOG_ALL_STEPS = log
TIMING_PATH = os.environ.get("METACELL_TIMING_CSV", TIMING_PATH) TIMING_MODE = os.environ.get("METACELL_TIMING_MODE", TIMING_MODE) TIMING_BUFFERING = int(os.environ.get("METACELL_TIMING_BUFFERING", str(TIMING_BUFFERING))) collect_timing({"true": True, "false": False}[os.environ.get("METACELLS_COLLECT_TIMING", str(COLLECT_TIMING)).lower()]) log_steps({"true": True, "false": False}[os.environ.get("METACELLS_LOG_ALL_STEPS", str(LOG_ALL_STEPS)).lower()])
[docs] class Counters: """ The counters for the execution times. """ def __init__(self, *, elapsed_ns: int = 0, cpu_ns: int = 0) -> None: self.elapsed_ns = elapsed_ns #: Elapsed time counter. self.cpu_ns = cpu_ns #: CPU time counter.
[docs] @staticmethod def now() -> "Counters": """ Return the current value of the counters. """ return Counters(elapsed_ns=perf_counter_ns(), cpu_ns=process_time_ns())
def __add__(self, other: "Counters") -> "Counters": return Counters(elapsed_ns=self.elapsed_ns + other.elapsed_ns, cpu_ns=self.cpu_ns + other.cpu_ns) def __iadd__(self, other: "Counters") -> "Counters": self.elapsed_ns += other.elapsed_ns self.cpu_ns += other.cpu_ns return self def __sub__(self, other: "Counters") -> "Counters": return Counters(elapsed_ns=self.elapsed_ns - other.elapsed_ns, cpu_ns=self.cpu_ns - other.cpu_ns) def __isub__(self, other: "Counters") -> "Counters": self.elapsed_ns -= other.elapsed_ns self.cpu_ns -= other.cpu_ns return self
[docs] class StepTiming: """ Timing information for some named processing step. """ def __init__(self, name: str, parent: Optional["StepTiming"]) -> None: """ Start collecting time for a named processing step. """ #: The parent step, if any. self.parent = parent if name[0] != ".": name = ";" + name if parent is None: assert not name[0] == "." name = name[1:] #: The full context of the processing step. self.context: str = name if parent is None else parent.context + name #: Parameters of interest of the processing step. self.parameters: List[str] = [] #: The thread the step was invoked in. self.thread_name = current_thread().name #: The amount of CPU used in nested steps in the same thread. self.total_nested = Counters()
class GcStep(NamedTuple): """ Data about a GC collection step. """ #: The counters when we started the GC step. start: Counters #: The counters when we ended the GC step. stop: Counters if COLLECT_TIMING: import gc GC_STEPS: List[GcStep] = [] GC_START_POINT: Optional[Counters] = None def _time_gc(phase: str, info: Dict[str, Any]) -> None: steps_stack = getattr(THREAD_LOCAL, "steps_stack", None) if not steps_stack: return global GC_START_POINT if phase == "start": assert GC_START_POINT is None GC_START_POINT = Counters.now() return assert phase == "stop" assert GC_START_POINT is not None gc_step = GcStep(start=GC_START_POINT, stop=Counters.now()) GC_STEPS.append(gc_step) GC_START_POINT = None gc_parameters = [] for name, value in info.items(): gc_parameters.append(name) gc_parameters.append(str(value)) _print_timing("__gc__", gc_step.stop - gc_step.start, gc_parameters) gc.callbacks.append(_time_gc)
[docs] @contextmanager def timed_step(name: str) -> Iterator[None]: # pylint: disable=too-many-branches """ Collect timing information for a computation step. Expected usage is: .. code:: python with ut.timed_step("foo"): some_computation() If we are collecting timing information, then for every invocation, the program will append a line similar to: .. code:: text foo,elapsed_ns,123,cpu_ns,456 To a timing log file (default: ``timing.csv``). Additional fields can be appended to the line using the ``metacells.utilities.timing.parameters`` function. If the ``name`` starts with a ``.`` of a ``_``, then it is prefixed with the names of the innermost surrounding step name (which must exist). This is commonly used to time sub-steps of a function. """ if not COLLECT_TIMING: yield None return steps_stack = getattr(THREAD_LOCAL, "steps_stack", None) if steps_stack is None: steps_stack = THREAD_LOCAL.steps_stack = [] parent_timing: Optional[StepTiming] = None if len(steps_stack) > 0: parent_timing = steps_stack[-1] if name[0] == "_": name = f".{name[1:]}" if name[0] == ".": assert parent_timing is not None step_timing = StepTiming(name, parent_timing) steps_stack.append(step_timing) if LOG_ALL_STEPS: utl.logger().debug(f"{{[( {step_timing.context}") # pylint: disable=logging-fstring-interpolation yield_point = Counters.now() try: yield None finally: back_point = Counters.now() total_times = back_point - yield_point global GC_STEPS gc_steps: List[GcStep] = [] for gc_step in GC_STEPS: if yield_point.elapsed_ns <= gc_step.start.elapsed_ns and gc_step.stop.elapsed_ns <= back_point.elapsed_ns: total_times -= gc_step.stop - gc_step.start else: gc_steps.append(gc_step) GC_STEPS = gc_steps assert total_times.elapsed_ns >= 0 assert total_times.cpu_ns >= 0 if LOG_ALL_STEPS: utl.logger().debug("}]) %s", step_timing.context) steps_stack.pop() if parent_timing is not None: parent_timing.total_nested += total_times total_times -= step_timing.total_nested _print_timing(step_timing.context, total_times, step_timing.parameters) assert total_times.elapsed_ns >= 0 assert total_times.cpu_ns >= 0
def _print_timing( invocation_context: str, total_times: Counters, step_parameters: Optional[List[str]] = None, ) -> None: gc_enabled = gc.isenabled() gc.disable() try: global TIMING_FILE if TIMING_FILE is None: TIMING_FILE = open( # pylint: disable=consider-using-with TIMING_PATH, "a", buffering=TIMING_BUFFERING, encoding="utf8" ) text = [invocation_context, "elapsed_ns", str(total_times.elapsed_ns), "cpu_ns", str(total_times.cpu_ns)] if step_parameters: text.extend(step_parameters) TIMING_FILE.write(",".join(text) + "\n") finally: if gc_enabled: gc.enable()
[docs] def timed_parameters(**kwargs: Any) -> None: """ Associate relevant timing parameters to the innermost :py:func:`metacells.utilities.timing.timed_step`. The specified arguments are appended at the end of the generated ``timing.csv`` line. For example, ``timed_parameters(foo=2, bar=3)`` would add ``foo,2,bar,3`` to the line in ``timing.csv``. This allows tracking parameters which affect invocation time (such as array sizes), to help identify the causes for the long-running operations. """ step_timing = current_step() if step_timing is not None: for name, value in kwargs.items(): step_timing.parameters.append(name) step_timing.parameters.append(str(value))
CALLABLE = TypeVar("CALLABLE")
[docs] def timed_call(name: Optional[str] = None) -> Callable[[CALLABLE], CALLABLE]: """ Automatically wrap each invocation of the decorated function with :py:func:`metacells.utilities.timing.timed_step` using the ``name`` (by default, the function's ``__qualname__``). Expected usage is: .. code:: python @ut.timed_call() def some_function(...): ... """ if COLLECT_TIMING: def wrap(function: Callable) -> Callable: @wraps(function) def timed(*args: Any, **kwargs: Any) -> Any: with timed_step(name or function.__qualname__): return function(*args, **kwargs) timed.__is_timed__ = True # type: ignore return timed else: def wrap(function: Callable) -> Callable: function.__is_timed__ = True # type: ignore return function return wrap # type: ignore
[docs] def context() -> str: """ Return the full current context (path of :py:func:`metacells.utilities.timing.timed_step`-s leading to the current point). .. note:: The context will be the empty string unless we are actually collecting timing. """ steps_stack = getattr(THREAD_LOCAL, "steps_stack", None) if not steps_stack: return "" return steps_stack[-1].context
[docs] def current_step() -> Optional[StepTiming]: """ The timing collector for the innermost (current) :py:func:`metacells.utilities.timing.timed_step`, if any. """ if not COLLECT_TIMING: return None steps_stack = getattr(THREAD_LOCAL, "steps_stack", None) if steps_stack is None or len(steps_stack) == 0: return None return steps_stack[-1]
# This is a circular dependency so having it at the end allows the exported symbols to be seen.