from __future__ import annotations import asyncio import collections import sys import threading import time as timemod from collections.abc import Callable, Hashable, Iterator from contextlib import contextmanager from contextvars import ContextVar from dataclasses import dataclass from functools import wraps from math import nan from typing import Literal import psutil from distributed.compatibility import WINDOWS _empty_namedtuple = collections.namedtuple("_empty_namedtuple", ()) def _psutil_caller(method_name, default=_empty_namedtuple): """ Return a function calling the given psutil *method_name*, or returning *default* if psutil fails. """ meth = getattr(psutil, method_name) @wraps(meth) def wrapper(): # pragma: no cover try: return meth() except RuntimeError: # This can happen on some systems (e.g. no physical disk in worker) return default() return wrapper disk_io_counters = _psutil_caller("disk_io_counters") net_io_counters = _psutil_caller("net_io_counters") class _WindowsTime: """Combine time.time() or time.monotonic() with time.perf_counter() to get an absolute clock with fine resolution. """ base_timer: Callable[[], float] delta: float previous: float | None next_resync: float resync_every: float def __init__( self, base: Callable[[], float], is_monotonic: bool, resync_every: float = 600.0 ): self.base_timer = base self.previous = float("-inf") if is_monotonic else None self.next_resync = float("-inf") self.resync_every = resync_every def time(self) -> float: cur = timemod.perf_counter() if cur > self.next_resync: self.resync() self.next_resync = cur + self.resync_every cur += self.delta if self.previous is not None: # Monotonic timer if cur <= self.previous: cur = self.previous + 1e-9 self.previous = cur return cur def resync(self) -> None: _time = self.base_timer _perf_counter = timemod.perf_counter min_samples = 5 while True: times = [(_time(), _perf_counter()) for _ in range(min_samples * 2)] abs_times = collections.Counter(t[0] for t in times) first, nfirst = abs_times.most_common()[0] if nfirst < min_samples: # System too noisy? Start again continue perf_times = [t[1] for t in times if t[0] == first][:-1] assert len(perf_times) >= min_samples - 1, perf_times self.delta = first - sum(perf_times) / len(perf_times) break # A high-resolution wall clock timer measuring the seconds since Unix epoch if WINDOWS and sys.version_info < (3, 13): time = _WindowsTime(timemod.time, is_monotonic=False).time monotonic = _WindowsTime(timemod.monotonic, is_monotonic=True).time else: # Under modern Unixes, time.time() and time.monotonic() should be good enough time = timemod.time monotonic = timemod.monotonic process_time = timemod.process_time # Get a per-thread CPU timer function if possible, otherwise # use a per-process CPU timer function. try: # thread_time is not supported on all platforms thread_time = timemod.thread_time except (AttributeError, OSError): # pragma: no cover thread_time = process_time @dataclass class MeterOutput: start: float stop: float delta: float __slots__ = tuple(__annotations__) @contextmanager def meter( func: Callable[[], float] = timemod.perf_counter, floor: float | Literal[False] = 0.0, ) -> Iterator[MeterOutput]: """Convenience context manager which calls func() before and after the wrapped code and calculates the delta. Parameters ---------- label: str label to pass to the callback func: callable function to call before and after, which must return a number. Besides time, it could return e.g. cumulative network traffic or disk usage. Default: :func:`timemod.perf_counter` floor: float or False, optional Floor the delta to the given value (default: 0). This is useful for strictly cumulative functions that can occasionally glitch and go backwards. Set to False to disable. Yields ------ :class:`MeterOutput` where the ``start`` attribute is populated straight away, while ``stop`` and ``delta`` are nan until context exit. """ out = MeterOutput(func(), nan, nan) try: yield out finally: out.stop = func() out.delta = out.stop - out.start if floor is not False: out.delta = max(floor, out.delta) class ContextMeter: """Context-based general purpose meter. Usage ----- 1. In high level code, call :meth:`add_callback` to install a hook that defines an activity 2. In low level code, typically many stack levels below, log quantitative events (e.g. elapsed time, transferred bytes, etc.) so that they will be attributed to the high-level code calling it, either with :meth:`meter`, :meth:`meter_function`, or :meth:`digest_metric`. Examples -------- In the code that e.g. sends a Python object from A to B over the network: >>> from distributed.metrics import context_meter >>> with context_meter.add_callback(partial(print, "A->B comms:")): ... await send_over_the_network(obj) In the serialization utilities, called many stack levels below: >>> with context_meter.meter("dumps"): ... pik = pickle.dumps(obj) >>> with context_meter.meter("compress"): ... pik = lz4.compress(pik) And finally, elsewhere, deep into the TCP stack: >>> with context_meter.meter("network-write"): ... await comm.write(frames) When you call the top-level code, you'll get:: A->B comms: dumps 0.012 seconds A->B comms: compress 0.034 seconds A->B comms: network-write 0.567 seconds """ _callbacks: ContextVar[dict[Hashable, Callable[[Hashable, float, str], None]]] def __init__(self): self._callbacks = ContextVar(f"MetricHook<{id(self)}>._callbacks", default={}) def __reduce__(self): assert self is context_meter, "Found copy of singleton" return self._unpickle_singleton, () @staticmethod def _unpickle_singleton(): return context_meter @contextmanager def add_callback( self, callback: Callable[[Hashable, float, str], None], *, key: Hashable | None = None, allow_offload: bool = False, ) -> Iterator[None]: """Add a callback when entering the context and remove it when exiting it. The callback must accept the same parameters as :meth:`digest_metric`. Parameters ---------- callback: Callable ``f(label, value, unit)`` to be executed key: Hashable, optional Unique key for the callback. If two nested calls to ``add_callback`` use the same key, suppress the outermost callback. allow_offload: bool, optional If set to True, this context must be executed inside a running asyncio event loop. If a call to :meth:`digest_metric` is performed from a different thread, e.g. from inside :func:`distributed.utils.offload`, ensure that the callback is executed in the event loop's thread instead. """ if allow_offload: loop = asyncio.get_running_loop() tid = threading.get_ident() def safe_cb(label: Hashable, value: float, unit: str, /) -> None: if threading.get_ident() == tid: callback(label, value, unit) else: # We're inside offload() loop.call_soon_threadsafe(callback, label, value, unit) else: safe_cb = callback if key is None: key = object() cbs = self._callbacks.get() cbs = cbs.copy() cbs[key] = safe_cb tok = self._callbacks.set(cbs) try: yield finally: tok.var.reset(tok) @contextmanager def clear_callbacks(self) -> Iterator[None]: """Do not trigger any callbacks set outside of this context""" tok = self._callbacks.set({}) try: yield finally: tok.var.reset(tok) def digest_metric(self, label: Hashable, value: float, unit: str) -> None: """Invoke the currently set context callbacks for an arbitrary quantitative metric. """ cbs = self._callbacks.get() for cb in cbs.values(): cb(label, value, unit) @contextmanager def meter( self, label: Hashable, unit: str = "seconds", func: Callable[[], float] = timemod.perf_counter, floor: float | Literal[False] = 0.0, ) -> Iterator[MeterOutput]: """Convenience context manager or decorator which calls func() before and after the wrapped code, calculates the delta, and finally calls :meth:`digest_metric`. If unit=='seconds', it also subtracts any other calls to :meth:`meter` or :meth:`digest_metric` with the same unit performed within the context, so that the total is strictly additive. Parameters ---------- label: Hashable label to pass to the callback unit: str, optional unit to pass to the callback. Default: seconds func: callable see :func:`meter` floor: bool, optional see :func:`meter` Yields ------ :class:`MeterOutput` where the ``start`` attribute is populated straight away, while ``stop`` and ``delta`` are nan until context exit. In case of multiple nested calls to :meth:`meter`, then delta (for seconds only) is reduced by the inner metrics, to a minimum of ``floor``. """ if unit != "seconds": try: with meter(func, floor=floor) as m: yield m finally: self.digest_metric(label, m.delta, unit) return # If unit=="seconds", subtract time metered from the sub-contexts offsets = [] def callback(label2: Hashable, value2: float, unit2: str) -> None: if unit2 == unit: # This must be threadsafe to support callbacks invoked from # distributed.utils.offload; '+=' on a float would not be threadsafe! offsets.append(value2) try: with self.add_callback(callback), meter(func, floor=False) as m: yield m finally: delta = m.delta - sum(offsets) if floor is not False: delta = max(floor, delta) m.delta = delta self.digest_metric(label, delta, unit) context_meter = ContextMeter() class DelayedMetricsLedger: """Add-on to :class:`ContextMeter` that helps in the case where: - The code to be metered is not easily expressed as a self-contained code block e.g. you want to measure latency in the asyncio event loop before and after running a task - You want to alter the metrics depending on how the code ends; e.g. you want to post them differently in case of failure. Examples -------- >>> ledger = DelayedMetricsLedger() # Metering starts here >>> async def wrapper(): ... with ledger.record(): ... return await metered_function() >>> task = asyncio.create_task(wrapper()) >>> # (later, elsewhere) >>> try: ... await task ... coarse_time = False ... except Exception: ... coarse_time = "failed" ... raise ... finally: ... # Metering stops here ... for label, value, unit in ledger.finalize(coarse_time): ... # actually log metrics """ func: Callable[[], float] start: float metrics: list[tuple[Hashable, float, str]] # (label, value, unit) def __init__(self, func: Callable[[], float] = timemod.perf_counter): self.func = func self.start = func() self.metrics = [] def _callback(self, label: Hashable, value: float, unit: str) -> None: self.metrics.append((label, value, unit)) @contextmanager def record(self, *, key: Hashable | None = None) -> Iterator[None]: """Ingest metrics logged with :meth:`ContextMeter.digest_metric` or :meth:`ContextMeter.meter` and temporarily store them in :ivar:`metrics`. Parameters ---------- key: Hashable, optional See :meth:`ContextMeter.add_callback` """ with context_meter.add_callback(self._callback, key=key): yield def finalize( self, coarse_time: str | Literal[False] = False, floor: float | Literal[False] = 0.0, ) -> Iterator[tuple[Hashable, float, str]]: """The metered code is terminated, and we now know how to log it. Parameters ---------- coarse_time: str | False, optional False Yield all acquired metrics, plus an extra time metric, labelled "other", which is the time between creating the DelayedMetricsLedger and calling this method, minus any time logged in the metrics. label Yield all acquired non-time metrics. Yield a single metric, labelled , which is the time between creating the DelayedMetricsLedger and calling this method. floor: float | False, optional Floor either the "other" or the metric to this value (default: 0). Set to False to disable. """ stop = self.func() delta = stop - self.start for label, value, unit in self.metrics: if unit != "seconds" or not coarse_time: yield label, value, unit if unit == "seconds" and not coarse_time: delta -= value if floor is not False: delta = max(floor, delta) yield coarse_time or "other", delta, "seconds"