Source code for pylops_mpi.utils.benchmark

__all__ = ["benchmark",
           "mark",
           ]

import functools
import logging
import os
import time
from typing import Callable, Optional, List
from mpi4py import MPI

from pylops.utils import deps as pylops_deps  # avoid namespace crashes with pylops_mpi.utils
from pylops_mpi.utils import deps

cupy_message = pylops_deps.cupy_import("the benchmark module")
nccl_message = deps.nccl_import("the benchmark module")

if nccl_message is None and cupy_message is None:
    from pylops_mpi.utils._nccl import _nccl_sync
else:
    def _nccl_sync():
        pass

# Benchmark is enabled by default
ENABLE_BENCHMARK = int(os.getenv("BENCH_PYLOPS_MPI", 1)) == 1

# Stack of active mark functions for nested support
_mark_func_stack = []
_markers = []


def _parse_output_tree(markers: List[str]):
    """This function parses the list of strings gathered during the benchmark call and output them
    as one properly formatted string. The format of output string follows the hierarchy of function calls
    i.e., the nested funtion calls are indented.

    Parameters
    ----------
    markers: :obj:`list`, optional
        A list of markers/labels generated from the benchmark call
    """
    global _markers
    output = []
    stack = []
    i = 0
    while i < len(markers):
        label, time, level = markers[i]
        if label.startswith("[decorator]"):
            indent = "\t" * (level - 1)
            output.append(f"{indent}{label}: total runtime: {time:6f} s\n")
        else:
            if stack:
                prev_label, prev_time, prev_level = stack[-1]
                if prev_level == level:
                    indent = "\t" * level
                    output.append(f"{indent}{prev_label}-->{label}: {time - prev_time:6f} s\n")
                    stack.pop()

            # Push to the stack only if it is going deeper or still at the same level
            if i + 1 <= len(markers) - 1:
                _, _ , next_level = markers[i + 1]
                if next_level >= level:
                    stack.append(markers[i])
        i += 1
    # reset markers, allowing other benchmarked function to start fresh
    _markers = []
    return output


def _sync():
    """Synchronize all MPI processes or CUDA Devices"""
    _nccl_sync()
    MPI.COMM_WORLD.Barrier()


[docs] def mark(label: str): """This function allows users to measure time arbitary lines of the function Parameters ---------- label: :obj:`str` A label of the mark. This signifies both 1) the end of the previous mark 2) the beginning of the new mark """ if not ENABLE_BENCHMARK: return if not _mark_func_stack: raise RuntimeError("mark() called outside of a benchmarked region") _mark_func_stack[-1](label)
[docs] def benchmark(func: Optional[Callable] = None, description: Optional[str] = "", logger: Optional[logging.Logger] = None, ): """A wrapper for code injection for time measurement. This wrapper measures the start-to-end time of the wrapped function when decorated without any argument. It also allows users to put a call to mark() anywhere inside the wrapped function for fine-grain time benchmark. This wrapper defines the local_mark() and pushes it to the _mark_func_stack for isolation in case of nested call. The user-facing mark() will always call the function at the top of the _mark_func_stack. Parameters ---------- func : :obj:`callable`, optional Function to be decorated. Defaults to ``None``. description : :obj:`str`, optional Description for the output text. Defaults to ``''``. logger: :obj:`logging.Logger`, optional A `logging.Logger` object for logging the benchmark text output. This logger must be setup before passing to this function to either writing output to a file or log to stdout. If `logger` is not provided, the output is printed to stdout. """ def noop_decorator(func): @functools.wraps(func) def wrapped(*args, **kwargs): return func(*args, **kwargs) return wrapped @functools.wraps(func) def decorator(func): def wrapper(*args, **kwargs): rank = MPI.COMM_WORLD.Get_rank() level = len(_mark_func_stack) + 1 # The header is needed for later tree parsing. Here it is allocating its spot. # the tuple at this index will be replaced after elapsed time is calculated. _markers.append((f"[decorator]{description or func.__name__}", None, level)) header_index = len(_markers) - 1 def local_mark(label): _markers.append((label, time.perf_counter(), level)) _mark_func_stack.append(local_mark) _sync() start_time = time.perf_counter() # the mark() called in wrapped function will now call local_mark result = func(*args, **kwargs) _sync() end_time = time.perf_counter() elapsed = end_time - start_time _markers[header_index] = (f"[decorator]{description or func.__name__}", elapsed, level) # In case of nesting, the wrapped callee must pop its closure from stack so that # when the callee returns, the wrapped caller operates on its closure (and its level label), which now becomes # the top of the stack. _mark_func_stack.pop() # all the calls have fininshed if not _mark_func_stack: if rank == 0: output = _parse_output_tree(_markers) if logger: logger.info("".join(output)) else: print("".join(output)) return result return wrapper # The code still has to return decorator so that the in-place decorator with arguments # like @benchmark(logger=logger) does not throw the error and can be kept untouched. if not ENABLE_BENCHMARK: return noop_decorator if func is None else noop_decorator(func) return decorator if func is None else decorator(func)