from typing import Union
import numpy as np
from mpi4py import MPI
from pylops.signalprocessing import NonStationaryConvolve1D
from pylops.utils._internal import _value_or_sized_to_tuple
from pylops.utils.backend import to_numpy
from pylops.utils.typing import DTypeLike, InputDimsLike, NDArray
from pylops_mpi import MPILinearOperator
from pylops_mpi.basicoperators.BlockDiag import MPIBlockDiag
from pylops_mpi.basicoperators.Halo import MPIHalo, halo_block_split
[docs]
def MPINonStationaryConvolve1D(
dims: Union[int, InputDimsLike],
hs: NDArray,
ih: InputDimsLike,
axis: int = -1,
base_comm: MPI.Comm = MPI.COMM_WORLD,
dtype: DTypeLike = "float64",
) -> MPILinearOperator:
r"""1D non-stationary convolution operator.
Apply distributed non-stationary one-dimensional convolution.
A varying compact filter is provided on a coarser grid and on-the-fly
interpolation is applied in forward and adjoint modes.
Alongside distributing the input array across different ranks, the
filters are also distributed and filters operating at the edges of the
local arrays are replicated on both ranks either side of the edge.
.. note::
Currently the
:obj:`pylops_mpi.signalprocessing.MPINonStationaryConvolve1D`
requires that shape of the local arrays of the input
:obj:`pylops_mpi.DistributedArray` are be the same for all ranks.
Parameters
----------
dims : :obj:`list` or :obj:`int`
Number of samples for each dimension of the input
:obj:`pylops_mpi.DistributedArray`.
hs : :obj:`numpy.ndarray`
Bank of 1d compact filters of size :math:`n_\text{filts} \times n_h`.
Filters must have odd number of samples and are assumed to be
centered in the middle of the filter support.
ih : :obj:`tuple`
Indices of the locations of the filters ``hs`` in the model (and data). Note
that the filters must be regularly sampled, i.e. :math:`dh=\text{diff}(ih)=\text{const.}`
axis : :obj:`int`, optional
Axis along which convolution is applied
base_comm : :obj:`mpi4py.MPI.Comm`, optional
MPI Base Communicator. Defaults to ``mpi4py.MPI.COMM_WORLD``.
dtype : :obj:`str`, optional
Type of elements in input array.
Attributes
----------
shape : :obj:`tuple`
Operator shape
Raises
------
ValueError
If filters ``hs`` have even size
ValueError
If ``ih`` is not regularly sampled
ValueError
If ``ih`` is outside the bounds defined by ``dims[axis]``
Notes
-----
The MPINonStationaryConvolve1D operator applies non-stationary
one-dimensional convolution between the input signal :math:`d(t)`
and a bank of compact filter kernels :math:`h(t; t_i)`. Assume
the input signal is composed of :math:`N=16` samples, and distributed
across :math:`N=2` ranks (with each local signal composes of
:math:`N=8` samples); similarly, consider :math:`N=4` filters
at locations :math:`t_2` and :math:`t_6` in the first rank and
:math:`t_10` and :math:`t_14` in the second rank. Each rank applies
an halo of :math:`N=4` samples to include the following/preceding
filter, and applies locally a
:class:pylops.signalprocessingNonStationaryConvolve1D` operator;
finally the halo is removed from each local convolved signal.
"""
# TODO: need to adapt operator to handle NDarrays
rank = base_comm.Get_rank()
size = base_comm.Get_size()
dims = _value_or_sized_to_tuple(dims)
ih = to_numpy(ih)
# Checks for local operator
if hs.shape[1] % 2 == 0:
raise ValueError("filters hs must have odd length")
if len(np.unique(np.diff(ih))) > 1:
raise ValueError(
"the indices of filters 'ih' are must be regularly sampled"
)
if min(ih) < 0 or max(ih) >= dims[axis]:
raise ValueError(
"the indices of filters 'ih' must be larger than 0 and "
"smaller than `dims`"
)
# Additional check for distributed operarator
if dims[axis] % size:
raise ValueError(
f"number of input samples {dims[0]} is not divisible by "
f"the number of ranks ({size})"
)
# Identify halo as the maximum distance between any partition of
# the distributed array into local arrays and the closest filter
# outside of the partition plus half of the size of the filter
dims_local = dims[axis] // size
starts_local = np.arange(0, dims[axis], dims_local)
start_local = starts_local[rank]
end_local = start_local + dims_local - 1
ihidx_local = np.where((ih >= start_local) & (ih <= end_local))[0]
if len(ihidx_local) == 0:
raise ValueError(f"rank {rank} has zerof filters!")
ihdiff = np.diff(ih)[0]
ih_local = ih[ihidx_local]
dist_start_local = 0 if rank == 0 else ihdiff - (ih_local[0] - start_local)
dist_end_local = 0 if rank == (size - 1) else ihdiff - (end_local - ih_local[-1])
dist_start = np.max(np.array(base_comm.allgather(dist_start_local)))
dist_end = np.max(np.array(base_comm.allgather(dist_end_local)))
halo = max([dist_start, dist_end]) + (hs.shape[1] // 2 + 1)
if size == 1:
# to allow operator to work also with size=1
halo = 0
# Create halo operator
proc_grid_shape = [1, ] * len(dims)
proc_grid_shape[axis] = size
HOp = MPIHalo(
dims=dims,
halo=halo,
proc_grid_shape=proc_grid_shape,
comm=base_comm,
dtype=dtype
)
if size == 1:
# to allow operator to work also with size=1
dims_ns = list(dims)
dims_ns[axis] = dims_local + halo
COp = NonStationaryConvolve1D(
dims=dims_ns, hs=hs, ih=ih, axis=axis, dtype=dtype)
else:
x_slice = halo_block_split(dims if len(dims) == 1 else (dims[axis], ),
base_comm, (size, ))
if rank == 0:
dims_ns = list(dims)
dims_ns[axis] = dims_local + halo
COp = NonStationaryConvolve1D(
dims=dims_ns, hs=hs[:ihidx_local[-1] + 2],
ih=ih[:ihidx_local[-1] + 2],
axis=axis,
dtype=dtype
)
elif rank == size - 1:
dims_ns = list(dims)
dims_ns[axis] = dims_local + halo
COp = NonStationaryConvolve1D(
dims=dims_ns, hs=hs[ihidx_local[0] - 1:],
ih=ih[ihidx_local[0] - 1:] - x_slice[0].start + halo,
axis=axis,
dtype=dtype
)
else:
dims_ns = list(dims)
dims_ns[axis] = dims_local + 2 * halo
COp = NonStationaryConvolve1D(
dims=dims_ns, hs=hs[ihidx_local[0] - 1: ihidx_local[-1] + 2],
ih=ih[ihidx_local[0] - 1: ihidx_local[-1] + 2] - x_slice[0].start + halo,
axis=axis,
dtype=dtype
)
COp_full = MPIBlockDiag([COp, ])
Op = HOp.H @ COp_full @ HOp
return Op