Source code for pylops_mpi.LinearOperator

import numpy as np
from mpi4py import MPI
from typing import Callable, Optional

from scipy.sparse._sputils import isintlike
from scipy.sparse.linalg._interface import _get_dtype

from pylops import LinearOperator
from pylops.utils import DTypeLike, ShapeLike

from pylops_mpi import DistributedArray


[docs] class MPILinearOperator: """MPI-enabled PyLops Linear Operator Common interface for performing matrix-vector products in distributed fashion. In practice, this class provides methods to perform matrix-vector and adjoint matrix-vector products between any :obj:`pylops.LinearOperator` (which must be the same across ranks) and a :class:`pylops_mpi.DistributedArray` with ``Partition.BROADCAST`` and ``Partition.UNSAFE_BROADCAST`` partition. It internally handles the extraction of the local array from the distributed array and the creation of the output :class:`pylops_mpi.DistributedArray`. Note that whilst this operator could also be used with different :obj:`pylops.LinearOperator` across ranks, and with a :class:`pylops_mpi.DistributedArray` with ``Partition.SCATTER``, it is however reccomended to use the :class:`pylops_mpi.basicoperators.MPIBlockDiag` operator instead as this can also handle distributed arrays with subcommunicators. Parameters ---------- Op : :obj:`pylops.LinearOperator`, optional PyLops Linear Operator to wrap. Defaults to ``None``. shape : :obj:`tuple(int, int)`, optional Shape of the MPI Linear Operator. Defaults to ``None``. dtype : :obj:`str`, optional Type of elements in input array. Defaults to ``None``. base_comm : :obj:`mpi4py.MPI.Comm`, optional MPI Base Communicator. Defaults to ``mpi4py.MPI.COMM_WORLD``. """ def __init__(self, Op: Optional[LinearOperator] = None, shape: Optional[ShapeLike] = None, dtype: Optional[DTypeLike] = None, base_comm: MPI.Comm = MPI.COMM_WORLD): if Op: self.Op = Op dtype = self.Op.dtype if dtype is None else dtype shape = self.Op.shape if shape is None else shape if shape: self.shape = shape if dtype: self.dtype = dtype # For MPI self.base_comm = base_comm self.size = self.base_comm.Get_size() self.rank = self.base_comm.Get_rank() def matvec(self, x: DistributedArray) -> DistributedArray: """Matrix-vector multiplication. Modified version of pylops matvec This method makes use of :class:`pylops_mpi.DistributedArray` to calculate matrix vector multiplication in a distributed fashion. Parameters ---------- x : :obj:`pylops_mpi.DistributedArray` A DistributedArray of global shape (N, ). Returns ------- y : :obj:`pylops_mpi.DistributedArray` DistributedArray of global shape (M, ) """ M, N = self.shape if x.global_shape != (N,): raise ValueError("dimension mismatch") return self._matvec(x) def _matvec(self, x: DistributedArray) -> DistributedArray: if self.Op: y = DistributedArray(global_shape=self.shape[0], base_comm=self.base_comm, partition=x.partition, axis=x.axis, engine=x.engine, dtype=self.dtype) y[:] = self.Op._matvec(x.local_array) return y def rmatvec(self, x: DistributedArray) -> DistributedArray: """Adjoint Matrix-vector multiplication. Modified version of pylops rmatvec This method makes use of :class:`pylops_mpi.DistributedArray` to calculate adjoint matrix vector multiplication in a distributed fashion. Parameters ---------- x : :obj:`pylops_mpi.DistributedArray` A DistributedArray of global shape (M, ). Returns ------- y : :obj:`pylops_mpi.DistributedArray` DistributedArray of global shape (N, ) """ M, N = self.shape if x.global_shape != (M,): raise ValueError("dimension mismatch") return self._rmatvec(x) def _rmatvec(self, x: DistributedArray) -> DistributedArray: if self.Op: y = DistributedArray(global_shape=self.shape[1], base_comm=self.base_comm, partition=x.partition, axis=x.axis, engine=x.engine, dtype=self.dtype) y[:] = self.Op._rmatvec(x.local_array) return y def dot(self, x): """Matrix Vector Multiplication Parameters ---------- x : :obj:`pylops_mpi.DistributedArray` or :obj:`pylops_mpi.MPILinearOperator DistributedArray or a MPILinearOperator. Returns ------- y : :obj:`pylops_mpi.DistributedArray` or :obj:`pylops_mpi.MPILinearOperator` DistributedArray or a MPILinearOperator. """ if isinstance(x, MPILinearOperator): return _ProductLinearOperator(self, x) elif np.isscalar(x): return _ScaledLinearOperator(self, x) else: if x is None or x.ndim == 1: return self.matvec(x) else: raise ValueError('expected 1-d DistributedArray, got %r' % x.global_shape) def adjoint(self): """Adjoint MPI LinearOperator Returns ------- op : :obj:`pylops_mpi.MPILinearOperator` Adjoint of Operator """ return self._adjoint() H = property(adjoint) def transpose(self): """Transposition of MPI LinearOperator Returns ------- op : :obj:`pylops_mpi.MPILinearOperator` Transpose Linear Operator """ return self._transpose() T = property(transpose) def __mul__(self, x): return self.dot(x) def __rmul__(self, x): if np.isscalar(x): return _ScaledLinearOperator(self, x) else: return NotImplemented def __matmul__(self, x): if np.isscalar(x): raise ValueError("Scalar not allowed, use * instead") return self.__mul__(x) def __rmatmul__(self, x): if np.isscalar(x): raise ValueError("Scalar not allowed, use * instead") return self.__rmul__(x) def __pow__(self, p): return _PowerLinearOperator(self, p) def __add__(self, x): return _SumLinearOperator(self, x) def __neg__(self): return _ScaledLinearOperator(self, -1) def __sub__(self, x): return self.__add__(-x) def _adjoint(self): return _AdjointLinearOperator(self) def _transpose(self): return _TransposedLinearOperator(self) def conj(self): """Complex conjugate operator Returns ------- conjop : :obj:`pylops_mpi.MPILinearOperator` Complex conjugate operator """ return _ConjLinearOperator(self) def __repr__(self): M, N = self.shape if self.dtype is None: dt = "unspecified dtype" else: dt = f"dtype={self.dtype}" return f"<{M}x{N} {self.__class__.__name__} with {dt}>"
class _AdjointLinearOperator(MPILinearOperator): """Adjoint of MPI Linear Operator""" def __init__(self, A: MPILinearOperator): self.A = A self.args = (A,) super().__init__(shape=(A.shape[1], A.shape[0]), dtype=A.dtype, base_comm=MPI.COMM_WORLD) def _matvec(self, x: DistributedArray) -> DistributedArray: return self.A.rmatvec(x) def _rmatvec(self, x: DistributedArray) -> DistributedArray: return self.A.matvec(x) class _TransposedLinearOperator(MPILinearOperator): """Transposition of MPI Linear Operator""" def __init__(self, A: MPILinearOperator): self.A = A self.args = (A,) super().__init__(shape=(A.shape[1], A.shape[0]), dtype=A.dtype, base_comm=MPI.COMM_WORLD) def _matvec(self, x: DistributedArray) -> DistributedArray: x = x.conj() y = self.A.rmatvec(x) y = y.conj() return y def _rmatvec(self, x: DistributedArray) -> DistributedArray: x = x.conj() y = self.A.matvec(x) y = y.conj() return y class _ProductLinearOperator(MPILinearOperator): """Product of MPI LinearOperators """ def __init__(self, A: MPILinearOperator, B: MPILinearOperator): if not isinstance(A, MPILinearOperator) or not isinstance(B, MPILinearOperator): raise ValueError('both operands have to be a LinearOperator') if A.shape[1] != B.shape[0]: raise ValueError('cannot multiply %r and %r: shape mismatch' % (A, B)) self.args = (A, B) super().__init__(shape=(A.shape[0], B.shape[1]), dtype=_get_dtype([A, B]), base_comm=MPI.COMM_WORLD) def _matvec(self, x: DistributedArray) -> DistributedArray: return self.args[0].matvec(self.args[1].matvec(x)) def _rmatvec(self, x: DistributedArray) -> DistributedArray: return self.args[1].rmatvec(self.args[0].rmatvec(x)) def _adjoint(self) -> MPILinearOperator: A, B = self.args return B.H * A.H class _ScaledLinearOperator(MPILinearOperator): """Scaled MPI Linear Operator """ def __init__(self, A: MPILinearOperator, alpha): if not isinstance(A, MPILinearOperator): raise ValueError('MPILinearOperator expected as A') if not np.isscalar(alpha): raise ValueError('scalar expected as alpha') self.args = (A, alpha) super().__init__(shape=A.shape, dtype=_get_dtype([A], [type(alpha)]), base_comm=MPI.COMM_WORLD) def _matvec(self, x: DistributedArray) -> DistributedArray: y = self.args[0].matvec(x) if y is not None: y[:] *= self.args[1] return y def _rmatvec(self, x: DistributedArray) -> DistributedArray: y = self.args[0].rmatvec(x) if y is not None: y[:] *= np.conj(self.args[1]) return y def _adjoint(self) -> MPILinearOperator: A, alpha = self.args return A.H * np.conj(alpha) class _SumLinearOperator(MPILinearOperator): """Sum of MPI LinearOperators """ def __init__(self, A: MPILinearOperator, B: MPILinearOperator): if not isinstance(A, MPILinearOperator) or not isinstance(B, MPILinearOperator): raise ValueError('both operands have to be a MPILinearOperator') # Make sure it works with different kinds if A.shape != B.shape: raise ValueError("cannot add %r and %r: shape mismatch" % (A, B)) self.args = (A, B) super().__init__(shape=A.shape, dtype=A.dtype, base_comm=MPI.COMM_WORLD) def _matvec(self, x: DistributedArray) -> DistributedArray: arr1 = self.args[0].matvec(x) arr2 = self.args[1].matvec(x) return arr1 + arr2 def _rmatvec(self, x: DistributedArray) -> DistributedArray: arr1 = self.args[0].rmatvec(x) arr2 = self.args[1].rmatvec(x) return arr1 + arr2 def _adjoint(self) -> MPILinearOperator: A, B = self.args return A.H + B.H class _PowerLinearOperator(MPILinearOperator): """Power of MPI Linear Operator """ def __init__(self, A: MPILinearOperator, p: int) -> None: if not isinstance(A, MPILinearOperator): raise ValueError("LinearOperator expected as A") if A.shape[0] != A.shape[1]: raise ValueError("square LinearOperator expected, got %r" % A) if not isintlike(p) or p < 0: raise ValueError("non-negative integer expected as p") super(_PowerLinearOperator, self).__init__(shape=A.shape, dtype=A.dtype, base_comm=A.base_comm) self.args = (A, p) def _power(self, fun: Callable, x: DistributedArray) -> DistributedArray: res = x.copy() for _ in range(self.args[1]): res[:] = fun(res).local_array return res def _matvec(self, x: DistributedArray) -> DistributedArray: return self._power(self.args[0].matvec, x) def _rmatvec(self, x: DistributedArray) -> DistributedArray: return self._power(self.args[0].rmatvec, x) class _ConjLinearOperator(MPILinearOperator): """Complex conjugate MPI Linear Operator """ def __init__(self, A: MPILinearOperator): if not isinstance(A, MPILinearOperator): raise TypeError('A must be a MPILinearOperator') self.A = A super().__init__(shape=A.shape, dtype=A.dtype, base_comm=MPI.COMM_WORLD) def _matvec(self, x: DistributedArray) -> DistributedArray: x = x.conj() y = self.A.matvec(x) if y is not None: y = y.conj() return y def _rmatvec(self, x: DistributedArray) -> DistributedArray: x = x.conj() y = self.A.rmatvec(x) if y is not None: y = y.conj() return y def _adjoint(self) -> MPILinearOperator: return _ConjLinearOperator(self.A.H)
[docs] def asmpilinearoperator(Op): """Return Op as a MPI LinearOperator. Converts a :class:`pylops.LinearOperator` to a :class:`pylops_mpi.MPILinearOperator`. Parameters ---------- Op : :obj:`pylops.LinearOperator` PyLops LinearOperator Returns ------- Op : :obj:`pylops_mpi.MPILinearOperator` Operator of type :obj:`pylops_mpi.MPILinearOperator` """ if isinstance(Op, MPILinearOperator): return Op else: return MPILinearOperator(Op=Op, base_comm=MPI.COMM_WORLD)