from typing import Callable, Union
import numpy as np
from mpi4py import MPI
from pylops.utils.backend import get_module
from pylops.utils.typing import DTypeLike, InputDimsLike
from pylops.utils._internal import _value_or_sized_to_tuple
from pylops_mpi import DistributedArray, MPILinearOperator, Partition
from pylops_mpi.utils.decorators import reshaped
[docs]
class MPISecondDerivative(MPILinearOperator):
r"""MPI Second Derivative
Apply a second derivative using a three-point stencil finite-difference
approximation with :class:`pylops_mpi.DistributedArray`. The Second Derivative
is calculated along ``axis=0``.
Parameters
----------
dims : :obj:`int` or :obj:`tuple`
Number of samples for each dimension.
sampling : :obj:`float`, optional
Sampling step :math:`\Delta x`.
kind : :obj:`str`, optional
Derivative kind (``forward``, ``centered``, or ``backward``).
edge : :obj:`bool`, optional
Use reduced order derivative at edges (``True``) or
ignore them (``False``). This is currently only available
for centered derivative.
base_comm : :obj:`mpi4py.MPI.Comm`, optional
MPI Base Communicator. Defaults to ``mpi4py.MPI.COMM_WORLD``.
dtype : :obj:`str`, optional
Type of elements in the input array.
Attributes
----------
shape : :obj:`tuple`
Operator shape
Notes
-----
The MPISecondDerivative operator applies a second derivative to a :class:`pylops_mpi.DistributedArray`
using either a second-order forward, backward or a centered stencil.
Similar to the concept of :py:class:`pylops_mpi.basicoperators.MPIFirstDerivative`,
**ghosting** is applied at each rank to include duplicated copies of border cells
(ghost cells) from neighboring processes. These ghost cells enable each process
to perform local computations independently, avoiding the necessity for direct
communication with other processes. As a result, it becomes possible to calculate
the Second Derivative of the entire distributed array efficiently.
Now, for a one-dimensional DistributedArray, the second-order forward stencil is:
.. math::
y[i] = (x[i+2] - 2 * x[i+1] + x[i]) / \mathbf{\Delta x}^2
while the second-order backward stencil is:
.. math::
y[i] = (x[i] - 2 * x[i-1] + x[i-2]) / \mathbf{\Delta x}^2
and the second-order centered stencil is:
.. math::
y[i] = (x[i+1] - 2 * x[i] + x[i-1]) / \mathbf{\Delta x}^2
where :math:`y` is a DistributedArray and :math:`x` is a Ghosted array created
by adding border cells of neighboring processes to the local array at each rank.
"""
def __init__(
self,
dims: Union[int, InputDimsLike],
sampling: float = 1.0,
kind: str = "centered",
edge: bool = False,
base_comm: MPI.Comm = MPI.COMM_WORLD,
dtype: DTypeLike = np.float64,
) -> None:
self.dims = _value_or_sized_to_tuple(dims)
shape = (int(np.prod(dims)),) * 2
super().__init__(shape=shape, dtype=np.dtype(dtype), base_comm=base_comm)
self.sampling = sampling
self.kind = kind
self.edge = edge
self._register_multiplications(self.kind)
def _register_multiplications(
self,
kind: str,
) -> None:
# choose _matvec and _rmatvec kind
self._hmatvec: Callable
self._hrmatvec: Callable
if kind == "forward":
self._hmatvec = self._matvec_forward
self._hrmatvec = self._rmatvec_forward
elif kind == "centered":
self._hmatvec = self._matvec_centered
self._hrmatvec = self._rmatvec_centered
elif kind == "backward":
self._hmatvec = self._matvec_backward
self._hrmatvec = self._rmatvec_backward
else:
raise NotImplementedError(
"'kind' must be 'forward', 'centered' or 'backward'"
)
def _matvec(self, x: DistributedArray) -> DistributedArray:
# If Partition.BROADCAST, then convert to Partition.SCATTER
if x.partition is Partition.BROADCAST:
x = DistributedArray.to_dist(x=x.local_array)
return self._hmatvec(x)
def _rmatvec(self, x: DistributedArray) -> DistributedArray:
# If Partition.BROADCAST, then convert to Partition.SCATTER
if x.partition is Partition.BROADCAST:
x = DistributedArray.to_dist(x=x.local_array)
return self._hrmatvec(x)
@reshaped
def _matvec_forward(self, x: DistributedArray) -> DistributedArray:
ncp = get_module(x.engine)
y = DistributedArray(global_shape=x.global_shape, local_shapes=x.local_shapes,
axis=x.axis, engine=x.engine, dtype=self.dtype)
ghosted_x = x.add_ghost_cells(cells_back=2)
y_forward = ghosted_x[2:] - 2 * ghosted_x[1:-1] + ghosted_x[:-2]
if self.rank == self.size - 1:
y_forward = ncp.append(y_forward, ncp.zeros((min(y.global_shape[0], 2),) + self.dims[1:]), axis=0)
y[:] = y_forward / self.sampling ** 2
return y
@reshaped
def _rmatvec_forward(self, x: DistributedArray) -> DistributedArray:
ncp = get_module(x.engine)
y = DistributedArray(global_shape=x.global_shape, local_shapes=x.local_shapes, axis=x.axis, dtype=self.dtype)
y[:] = 0
if self.rank == self.size - 1:
y[:-2] += x[:-2]
else:
y[:] += x[:]
ghosted_x = x.add_ghost_cells(cells_front=1, cells_back=1)
y_forward = ghosted_x[:-2]
if self.rank == 0:
y_forward = ncp.append(ncp.zeros((1,) + self.dims[1:]), y_forward, axis=0)
if self.rank == self.size - 1:
y_forward = ncp.append(y_forward, ncp.zeros((min(1, y.global_shape[0] - 1),) + self.dims[1:]), axis=0)
y[:] -= 2 * y_forward
ghosted_x = x.add_ghost_cells(cells_front=2)
y_forward = ghosted_x[:-2]
if self.rank == 0:
y_forward = ncp.append(ncp.zeros((min(y.global_shape[0], 2),) + self.dims[1:]), y_forward, axis=0)
y[:] += y_forward
y[:] /= self.sampling ** 2
return y
@reshaped
def _matvec_backward(self, x: DistributedArray) -> DistributedArray:
ncp = get_module(x.engine)
y = DistributedArray(global_shape=x.global_shape, local_shapes=x.local_shapes,
axis=x.axis, engine=x.engine, dtype=self.dtype)
ghosted_x = x.add_ghost_cells(cells_front=2)
y_backward = ghosted_x[2:] - 2 * ghosted_x[1:-1] + ghosted_x[:-2]
if self.rank == 0:
y_backward = ncp.append(ncp.zeros((min(y.global_shape[0], 2),) + self.dims[1:]), y_backward, axis=0)
y[:] = y_backward / self.sampling ** 2
return y
@reshaped
def _rmatvec_backward(self, x: DistributedArray) -> DistributedArray:
ncp = get_module(x.engine)
y = DistributedArray(global_shape=x.global_shape, local_shapes=x.local_shapes,
axis=x.axis, engine=x.engine, dtype=self.dtype)
y[:] = 0
ghosted_x = x.add_ghost_cells(cells_back=2)
y_backward = ghosted_x[2:]
if self.rank == self.size - 1:
y_backward = ncp.append(y_backward, ncp.zeros((min(2, y.global_shape[0]),) + self.dims[1:]), axis=0)
y[:] += y_backward
ghosted_x = x.add_ghost_cells(cells_front=1, cells_back=1)
y_backward = 2 * ghosted_x[2:]
if self.rank == 0:
y_backward = ncp.append(ncp.zeros((1,) + self.dims[1:]), y_backward, axis=0)
if self.rank == self.size - 1:
y_backward = ncp.append(y_backward, ncp.zeros((min(1, y.global_shape[0] - 1),) + self.dims[1:]), axis=0)
y[:] -= y_backward
if self.rank == 0:
y[2:] += x[2:]
else:
y[:] += x[:]
y[:] /= self.sampling ** 2
return y
@reshaped
def _matvec_centered(self, x: DistributedArray) -> DistributedArray:
ncp = get_module(x.engine)
y = DistributedArray(global_shape=x.global_shape, local_shapes=x.local_shapes,
axis=x.axis, engine=x.engine, dtype=self.dtype)
ghosted_x = x.add_ghost_cells(cells_front=1, cells_back=1)
y_centered = ghosted_x[2:] - 2 * ghosted_x[1:-1] + ghosted_x[:-2]
if self.rank == 0:
y_centered = ncp.append(ncp.zeros((1,) + self.dims[1:]), y_centered, axis=0)
if self.rank == self.size - 1:
y_centered = ncp.append(y_centered, ncp.zeros((min(1, y.global_shape[0] - 1),) + self.dims[1:]), axis=0)
y[:] = y_centered
if self.edge:
if self.rank == 0:
y[0] = x[0] - 2 * x[1] + x[2]
if self.rank == self.size - 1:
y[-1] = x[-3] - 2 * x[-2] + x[-1]
y[:] /= self.sampling ** 2
return y
@reshaped
def _rmatvec_centered(self, x: DistributedArray) -> DistributedArray:
ncp = get_module(x.engine)
y = DistributedArray(global_shape=x.global_shape, local_shapes=x.local_shapes,
axis=x.axis, engine=x.engine, dtype=self.dtype)
y[:] = 0
ghosted_x = x.add_ghost_cells(cells_back=2)
y_centered = ghosted_x[1:-1]
if self.rank == self.size - 1:
y_centered = ncp.append(y_centered, ncp.zeros((min(2, y.global_shape[0]),) + self.dims[1:]), axis=0)
y[:] += y_centered
ghosted_x = x.add_ghost_cells(cells_front=1, cells_back=1)
y_centered = 2 * ghosted_x[1:-1]
if self.rank == 0:
y_centered = ncp.append(ncp.zeros((1,) + self.dims[1:]), y_centered, axis=0)
if self.rank == self.size - 1:
y_centered = ncp.append(y_centered, ncp.zeros((min(1, y.global_shape[0] - 1),) + self.dims[1:]), axis=0)
y[:] -= y_centered
ghosted_x = x.add_ghost_cells(cells_front=2)
y_centered = ghosted_x[1:-1]
if self.rank == 0:
y_centered = ncp.append(ncp.zeros((min(2, y.global_shape[0]),) + self.dims[1:]), y_centered, axis=0)
y[:] += y_centered
if self.edge:
if self.rank == 0:
y[0] += x[0]
y[1] -= 2 * x[0]
y[2] += x[0]
if self.rank == self.size - 1:
y[-3] += x[-1]
y[-2] -= 2 * x[-1]
y[-1] += x[-1]
y[:] /= self.sampling ** 2
return y