Source code for pylops_mpi.basicoperators.BlockDiag

import numpy as np
from scipy.sparse.linalg._interface import _get_dtype
from mpi4py import MPI
from typing import Optional, Sequence

from pylops import LinearOperator
from pylops.utils import DTypeLike
from pylops.utils.backend import get_module

from pylops_mpi import MPILinearOperator, MPIStackedLinearOperator
from pylops_mpi import DistributedArray, StackedDistributedArray
from pylops_mpi.utils.decorators import reshaped


[docs] class MPIBlockDiag(MPILinearOperator): r"""MPI Block-diagonal operator. Create a block-diagonal operator from a set of linear operators using MPI. Each rank must initialize this operator by providing one or more linear operators which will be computed within such rank. Both model and data vectors must be of :class:`pylops_mpi.DistributedArray` type and partitioned between ranks according to the shapes of the different linear operators. Parameters ---------- ops : :obj:`list` One or more :class:`pylops.LinearOperator` to be stacked. base_comm : :obj:`mpi4py.MPI.Comm`, optional Base MPI Communicator. Defaults to ``mpi4py.MPI.COMM_WORLD``. dtype : :obj:`str`, optional Type of elements in input array. Attributes ---------- shape : :obj:`tuple` Operator shape Notes ----- An MPI Block Diagonal operator is composed of N linear operators, represented by **L**. Each rank has one or more :class:`pylops.LinearOperator`, which we represent here compactly as :math:`\mathbf{L}_i` for rank :math:`i`. Each operator performs forward mode operations using its corresponding model vector, denoted as **m**. This vector is effectively a :class:`pylops_mpi.DistributedArray` partitioned at each rank in such a way that its local shapes agree with those of the corresponding linear operators. The forward mode of each operator is then collected from all ranks as a DistributedArray, referred to as **d**. .. math:: \begin{bmatrix} \mathbf{d}_1 \\ \mathbf{d}_2 \\ \vdots \\ \mathbf{d}_n \end{bmatrix} = \begin{bmatrix} \mathbf{L}_1 & \mathbf{0} & \ldots & \mathbf{0} \\ \mathbf{0} & \mathbf{L}_2 & \ldots & \mathbf{0} \\ \vdots & \vdots & \ddots & \vdots \\ \mathbf{0} & \mathbf{0} & \ldots & \mathbf{L}_n \end{bmatrix} \begin{bmatrix} \mathbf{m}_1 \\ \mathbf{m}_2 \\ \vdots \\ \mathbf{m}_n \end{bmatrix} Likewise, for the adjoint mode, each operator executes operations in the adjoint mode, the adjoint mode of each operator is then collected from all ranks as a DistributedArray referred as **d**. .. math:: \begin{bmatrix} \mathbf{d}_1 \\ \mathbf{d}_2 \\ \vdots \\ \mathbf{d}_n \end{bmatrix} = \begin{bmatrix} \mathbf{L}_1^H & \mathbf{0} & \ldots & \mathbf{0} \\ \mathbf{0} & \mathbf{L}_2^H & \ldots & \mathbf{0} \\ \vdots & \vdots & \ddots & \vdots \\ \mathbf{0} & \mathbf{0} & \ldots & \mathbf{L}_n^H \end{bmatrix} \begin{bmatrix} \mathbf{m}_1 \\ \mathbf{m}_2 \\ \vdots \\ \mathbf{m}_n \end{bmatrix} """ def __init__(self, ops: Sequence[LinearOperator], base_comm: MPI.Comm = MPI.COMM_WORLD, dtype: Optional[DTypeLike] = None): self.ops = ops mops = np.zeros(len(self.ops), dtype=np.int64) nops = np.zeros(len(self.ops), dtype=np.int64) for iop, oper in enumerate(self.ops): nops[iop] = oper.shape[0] mops[iop] = oper.shape[1] self.mops = mops.sum() self.local_shapes_m = base_comm.allgather((self.mops, )) self.nops = nops.sum() self.local_shapes_n = base_comm.allgather((self.nops, )) self.nnops = np.insert(np.cumsum(nops), 0, 0) self.mmops = np.insert(np.cumsum(mops), 0, 0) shape = (base_comm.allreduce(self.nops), base_comm.allreduce(self.mops)) dtype = _get_dtype(ops) if dtype is None else np.dtype(dtype) super().__init__(shape=shape, dtype=dtype, base_comm=base_comm) @reshaped(forward=True, stacking=True) def _matvec(self, x: DistributedArray) -> DistributedArray: ncp = get_module(x.engine) y = DistributedArray(global_shape=self.shape[0], local_shapes=self.local_shapes_n, engine=x.engine, dtype=self.dtype) y1 = [] for iop, oper in enumerate(self.ops): y1.append(oper.matvec(x.local_array[self.mmops[iop]: self.mmops[iop + 1]])) y[:] = ncp.concatenate(y1) return y @reshaped(forward=False, stacking=True) def _rmatvec(self, x: DistributedArray) -> DistributedArray: ncp = get_module(x.engine) y = DistributedArray(global_shape=self.shape[1], local_shapes=self.local_shapes_m, engine=x.engine, dtype=self.dtype) y1 = [] for iop, oper in enumerate(self.ops): y1.append(oper.rmatvec(x.local_array[self.nnops[iop]: self.nnops[iop + 1]])) y[:] = ncp.concatenate(y1) return y
[docs] class MPIStackedBlockDiag(MPIStackedLinearOperator): r"""MPI Stacked BlockDiag Operator Create a diagonal stack of :class:`pylops_mpi.MPILinearOperator` operators. Parameters ---------- ops : :obj:`list` One or more :class:`pylops_mpi.MPILinearOperator` to be stacked. Attributes ---------- shape : :obj:`tuple` Operator shape Notes ----- A MPIStackedBlockDiag is composed of N :class:pylops_mpi.MPILinearOperator instances stacked along the diagonal. These MPI operators will be applied sequentially, with distributed computations performed within each operator. """ def __init__(self, ops: Sequence[MPILinearOperator], base_comm: MPI.Comm = MPI.COMM_WORLD, dtype: Optional[DTypeLike] = None): self.ops = ops dtype = _get_dtype(self.ops) if dtype is None else np.dtype(dtype) shape = (int(np.sum(op.shape[0] for op in ops)), int(np.sum(op.shape[1] for op in ops))) super().__init__(shape=shape, dtype=dtype, base_comm=base_comm) def _matvec(self, x: StackedDistributedArray) -> StackedDistributedArray: y1 = [] for xx, oper in zip(x, self.ops): y1.append(oper.matvec(xx)) return StackedDistributedArray(y1) def _rmatvec(self, x: StackedDistributedArray) -> StackedDistributedArray: y1 = [] for xx, oper in zip(x, self.ops): y1.append(oper.rmatvec(xx)) return StackedDistributedArray(y1)