Source code for pylops_mpi.DistributedArray

import numpy as np
from typing import Optional, Union, Tuple, List
from numbers import Integral
from mpi4py import MPI
from enum import Enum

from pylops.utils import DTypeLike, NDArray
from pylops.utils._internal import _value_or_sized_to_tuple
from pylops.utils.backend import get_module, get_array_module, get_module_name


[docs] class Partition(Enum): r"""Enum class Distributing data among different processes. - ``BROADCAST``: Distributes data to all processes. - ``SCATTER``: Distributes unique portions to each process. """ BROADCAST = "Broadcast" SCATTER = "Scatter"
[docs] def local_split(global_shape: Tuple, base_comm: MPI.Comm, partition: Partition, axis: int): """To get the local shape from the global shape Parameters ---------- global_shape : :obj:`tuple` Shape of the global array. base_comm : :obj:`MPI.Comm` Base MPI Communicator. partition : :obj:`Partition` Type of partition. axis : :obj:`int` Axis of distribution Returns ------- local_shape : :obj:`tuple` Shape of the local array. """ if partition == Partition.BROADCAST: local_shape = global_shape # Split the array else: local_shape = list(global_shape) if base_comm.Get_rank() < (global_shape[axis] % base_comm.Get_size()): local_shape[axis] = global_shape[axis] // base_comm.Get_size() + 1 else: local_shape[axis] = global_shape[axis] // base_comm.Get_size() return tuple(local_shape)
[docs] class DistributedArray: r"""Distributed Numpy Arrays Multidimensional NumPy-like distributed arrays. It brings NumPy arrays to high-performance computing. .. warning:: When setting the partition of the DistributedArray to :obj:`pylops_mpi.Partition.BROADCAST`, it is crucial to be aware that any attempts to make arrays different from rank to rank will be overwritten by the actions of rank 0. This means that if you modify the DistributedArray on a specific rank, and are using broadcast to synchronize the arrays across all ranks, the modifications made by other ranks will be discarded and overwritten with the value at rank 0. Parameters ---------- global_shape : :obj:`tuple` or :obj:`int` Shape of the global array. base_comm : :obj:`mpi4py.MPI.Comm`, optional MPI Communicator over which array is distributed. Defaults to ``mpi4py.MPI.COMM_WORLD``. partition : :obj:`Partition`, optional Broadcast or Scatter the array. Defaults to ``Partition.SCATTER``. axis : :obj:`int`, optional Axis along which distribution occurs. Defaults to ``0``. local_shapes : :obj:`list`, optional List of tuples or integers representing local shapes at each rank. engine : :obj:`str`, optional Engine used to store array (``numpy`` or ``cupy``) dtype : :obj:`str`, optional Type of elements in input array. Defaults to ``numpy.float64``. """ def __init__(self, global_shape: Union[Tuple, Integral], base_comm: Optional[MPI.Comm] = MPI.COMM_WORLD, partition: Partition = Partition.SCATTER, axis: int = 0, local_shapes: Optional[List[Union[Tuple, Integral]]] = None, engine: Optional[str] = "numpy", dtype: Optional[DTypeLike] = np.float64): if isinstance(global_shape, Integral): global_shape = (global_shape,) if len(global_shape) <= axis: raise IndexError(f"Axis {axis} out of range for DistributedArray " f"of shape {global_shape}") if partition not in Partition: raise ValueError(f"Should be either {Partition.BROADCAST} " f"or {Partition.SCATTER}") self.dtype = dtype self._global_shape = _value_or_sized_to_tuple(global_shape) self._base_comm = base_comm self._partition = partition self._axis = axis local_shapes = local_shapes if local_shapes is None else [_value_or_sized_to_tuple(local_shape) for local_shape in local_shapes] self._check_local_shapes(local_shapes) self._local_shape = local_shapes[base_comm.rank] if local_shapes else local_split(global_shape, base_comm, partition, axis) self._engine = engine self._local_array = get_module(engine).empty(shape=self.local_shape, dtype=self.dtype) def __getitem__(self, index): return self.local_array[index] def __setitem__(self, index, value): """Setter Method `Partition.SCATTER` - Local Arrays are assigned their unique values. `Partition.BROADCAST` - The value at rank-0 is broadcasted and is assigned to all the ranks. Parameters ---------- index : :obj:`int` or :obj:`slice` Represents the index positions where a value needs to be assigned. value : :obj:`int` or :obj:`numpy.ndarray` Represents the value that will be assigned to the local array at the specified index positions. """ if self.partition is Partition.BROADCAST: self.local_array[index] = self.base_comm.bcast(value) else: self.local_array[index] = value @property def global_shape(self): """Global Shape of the array Returns ------- global_shape : :obj:`tuple` """ return self._global_shape @property def base_comm(self): """Base MPI Communicator Returns ------- base_comm : :obj:`MPI.Comm` """ return self._base_comm @property def local_shape(self): """Local Shape of the Distributed array Returns ------- local_shape : :obj:`tuple` """ return self._local_shape @property def engine(self): """Engine of the Distributed array Returns ------- engine : :obj:`str` """ return self._engine @property def local_array(self): """View of the Local Array Returns ------- local_array : :obj:`numpy.ndarray` """ return self._local_array @property def rank(self): """Rank of the current process Returns ------- rank : :obj:`int` """ return self.base_comm.Get_rank() @property def size(self): """Total number of processes Size of parallel environment Returns ------- size : :obj:`int` """ return self.base_comm.Get_size() @property def axis(self): """Axis along which distribution occurs Returns ------- axis : :obj:`int` """ return self._axis @property def ndim(self): """Number of dimensions of the global array Returns ------- ndim : :obj:`int` """ return len(self.global_shape) @property def partition(self): """Type of Distribution Returns ------- partition_type : :obj:`str` """ return self._partition @property def local_shapes(self): """Gather Local shapes from all ranks Returns ------- local_shapes : :obj:`list` """ return self.base_comm.allgather(self.local_shape) def asarray(self): """Global view of the array Gather all the local arrays Returns ------- final_array : :obj:`numpy.ndarray` Global Array gathered at all ranks """ # Since the global array was replicated at all ranks if self.partition == Partition.BROADCAST: # Get only self.local_array. return self.local_array # Gather all the local arrays and apply concatenation. final_array = self.base_comm.allgather(self.local_array) return np.concatenate(final_array, axis=self.axis) @classmethod def to_dist(cls, x: NDArray, base_comm: MPI.Comm = MPI.COMM_WORLD, partition: Partition = Partition.SCATTER, axis: int = 0, local_shapes: Optional[List[Tuple]] = None): """Convert A Global Array to a Distributed Array Parameters ---------- x : :obj:`numpy.ndarray` Global array. base_comm : :obj:`MPI.Comm`, optional Type of elements in input array. Defaults to ``MPI.COMM_WORLD`` partition : :obj:`Partition`, optional Distributes the array, Defaults to ``Partition.Scatter``. axis : :obj:`int`, optional Axis of Distribution local_shapes : :obj:`list`, optional Local Shapes at each rank. Returns ---------- dist_array : :obj:`DistributedArray` Distributed Array of the Global Array """ dist_array = DistributedArray(global_shape=x.shape, base_comm=base_comm, partition=partition, axis=axis, local_shapes=local_shapes, engine=get_module_name(get_array_module(x)), dtype=x.dtype) if partition == Partition.BROADCAST: dist_array[:] = x else: slices = [slice(None)] * x.ndim local_shapes = np.append([0], base_comm.allgather( dist_array.local_shape[axis])) sum_shapes = np.cumsum(local_shapes) slices[axis] = slice(sum_shapes[dist_array.rank], sum_shapes[dist_array.rank + 1], None) dist_array[:] = x[tuple(slices)] return dist_array def _check_local_shapes(self, local_shapes): """Check if the local shapes align with the global shape""" if local_shapes: if len(local_shapes) != self.base_comm.size: raise ValueError(f"Length of local shapes is not equal to number of processes; " f"{len(local_shapes)} != {self.size}") # Check if local shape == global shape if self.partition is Partition.BROADCAST and local_shapes[self.rank] != self.global_shape: raise ValueError(f"Local shape is not equal to global shape at rank = {self.rank};" f"{local_shapes[self.rank]} != {self.global_shape}") elif self.partition is Partition.SCATTER: local_shape = local_shapes[self.rank] # Check if local shape sum up to global shape and other dimensions align with global shape if self._allreduce(local_shape[self.axis]) != self.global_shape[self.axis] or \ not np.array_equal(np.delete(local_shape, self.axis), np.delete(self.global_shape, self.axis)): raise ValueError(f"Local shapes don't align with the global shape;" f"{local_shapes} != {self.global_shape}") def _check_partition_shape(self, dist_array): """Check Partition and Local Shape of the Array """ if self.partition != dist_array.partition: raise ValueError("Partition of both the arrays must be same") if self.local_shape != dist_array.local_shape: raise ValueError(f"Local Array Shape Mismatch - " f"{self.local_shape} != {dist_array.local_shape}") def _allreduce(self, send_buf, recv_buf=None, op: MPI.Op = MPI.SUM): """MPI Allreduce operation """ if recv_buf is None: return self.base_comm.allreduce(send_buf, op) # For MIN and MAX which require recv_buf self.base_comm.Allreduce(send_buf, recv_buf, op) return recv_buf def __neg__(self): arr = DistributedArray(global_shape=self.global_shape, base_comm=self.base_comm, partition=self.partition, axis=self.axis, local_shapes=self.local_shapes, engine=self.engine, dtype=self.dtype) arr[:] = -self.local_array return arr def __add__(self, x): return self.add(x) def __iadd__(self, x): return self.iadd(x) def __sub__(self, x): return self.__add__(-x) def __isub__(self, x): return self.__iadd__(-x) def __mul__(self, x): return self.multiply(x) def __rmul__(self, x): return self.multiply(x) def add(self, dist_array): """Distributed Addition of arrays """ self._check_partition_shape(dist_array) SumArray = DistributedArray(global_shape=self.global_shape, base_comm=self.base_comm, dtype=self.dtype, partition=self.partition, local_shapes=self.local_shapes, engine=self.engine, axis=self.axis) SumArray[:] = self.local_array + dist_array.local_array return SumArray def iadd(self, dist_array): """Distributed In-place Addition of arrays """ self._check_partition_shape(dist_array) self[:] = self.local_array + dist_array.local_array return self def multiply(self, dist_array): """Distributed Element-wise multiplication """ if isinstance(dist_array, DistributedArray): self._check_partition_shape(dist_array) ProductArray = DistributedArray(global_shape=self.global_shape, base_comm=self.base_comm, dtype=self.dtype, partition=self.partition, local_shapes=self.local_shapes, engine=self.engine, axis=self.axis) if isinstance(dist_array, DistributedArray): # multiply two DistributedArray ProductArray[:] = self.local_array * dist_array.local_array else: # multiply with scalar ProductArray[:] = self.local_array * dist_array return ProductArray def dot(self, dist_array): """Distributed Dot Product """ self._check_partition_shape(dist_array) # Convert to Partition.SCATTER if Partition.BROADCAST x = DistributedArray.to_dist(x=self.local_array) \ if self.partition is Partition.BROADCAST else self y = DistributedArray.to_dist(x=dist_array.local_array) \ if self.partition is Partition.BROADCAST else dist_array # Flatten the local arrays and calculate dot product return self._allreduce(np.dot(x.local_array.flatten(), y.local_array.flatten())) def _compute_vector_norm(self, local_array: NDArray, axis: int, ord: Optional[int] = None): """Compute Vector norm using MPI Parameters ---------- local_array : :obj:`numpy.ndarray` Local Array at each rank axis : :obj:`int` Axis along which norm is computed ord : :obj:`int`, optional Order of the norm """ # Compute along any axis ord = 2 if ord is None else ord if local_array.ndim == 1: recv_buf = np.empty(shape=1, dtype=np.float64) else: global_shape = list(self.global_shape) global_shape[axis] = 1 recv_buf = np.empty(shape=global_shape, dtype=np.float64) if ord in ['fro', 'nuc']: raise ValueError(f"norm-{ord} not possible for vectors") elif ord == 0: # Count non-zero then sum reduction recv_buf = self._allreduce(np.count_nonzero(local_array, axis=axis).astype(np.float64)) elif ord == np.inf: # Calculate max followed by max reduction recv_buf = self._allreduce(np.max(np.abs(local_array), axis=axis).astype(np.float64), recv_buf, op=MPI.MAX) recv_buf = np.squeeze(recv_buf, axis=axis) elif ord == -np.inf: # Calculate min followed by min reduction recv_buf = self._allreduce(np.min(np.abs(local_array), axis=axis).astype(np.float64), recv_buf, op=MPI.MIN) recv_buf = np.squeeze(recv_buf, axis=axis) else: recv_buf = self._allreduce(np.sum(np.abs(np.float_power(local_array, ord)), axis=axis)) recv_buf = np.power(recv_buf, 1. / ord) return recv_buf def norm(self, ord: Optional[int] = None, axis: int = -1): """Distributed numpy.linalg.norm method Parameters ---------- ord : :obj:`int`, optional Order of the norm. axis : :obj:`int`, optional Axis along which vector norm needs to be computed. Defaults to ``-1`` """ # Convert to Partition.SCATTER if Partition.BROADCAST x = DistributedArray.to_dist(x=self.local_array) \ if self.partition is Partition.BROADCAST else self if axis == -1: # Flatten the local arrays and calculate norm return x._compute_vector_norm(x.local_array.flatten(), axis=0, ord=ord) if axis != x.axis: raise NotImplementedError("Choose axis along the partition.") # Calculate vector norm along the axis return x._compute_vector_norm(x.local_array, axis=x.axis, ord=ord) def conj(self): """Distributed conj() method """ conj = DistributedArray(global_shape=self.global_shape, base_comm=self.base_comm, partition=self.partition, axis=self.axis, local_shapes=self.local_shapes, engine=self.engine, dtype=self.dtype) conj[:] = self.local_array.conj() return conj def copy(self): """Creates a copy of the DistributedArray """ arr = DistributedArray(global_shape=self.global_shape, base_comm=self.base_comm, partition=self.partition, axis=self.axis, local_shapes=self.local_shapes, engine=self.engine, dtype=self.dtype) arr[:] = self.local_array return arr def ravel(self, order: Optional[str] = "C"): """Return a flattened DistributedArray Parameters ---------- order : :obj:`str`, optional Order in which array needs to be flattened. {'C','F', 'A', 'K'} Returns ------- arr : :obj:`pylops_mpi.DistributedArray` Flattened 1-D DistributedArray """ local_shapes = [(np.prod(local_shape, axis=-1), ) for local_shape in self.local_shapes] arr = DistributedArray(global_shape=np.prod(self.global_shape), local_shapes=local_shapes, partition=self.partition, engine=self.engine, dtype=self.dtype) local_array = np.ravel(self.local_array, order=order) x = local_array.copy() arr[:] = x return arr def add_ghost_cells(self, cells_front: Optional[int] = None, cells_back: Optional[int] = None): """Add ghost cells to the DistributedArray along the axis of partition at each rank. Parameters ---------- cells_front : :obj:`int`, optional Number of cells to be added from the previous process to the start of the array at each rank. Defaults to ``None``. cells_back : :obj:`int`, optional Number of cells to be added from the next process to the back of the array at each rank. Defaults to ``None``. Returns ------- ghosted_array : :obj:`numpy.ndarray` Ghosted Array """ ghosted_array = self.local_array.copy() if cells_front is not None: total_cells_front = self.base_comm.allgather(cells_front) + [0] # Read cells_front which needs to be sent to rank + 1(cells_front for rank + 1) cells_front = total_cells_front[self.rank + 1] if self.rank != 0: ghosted_array = np.concatenate([self.base_comm.recv(source=self.rank - 1, tag=1), ghosted_array], axis=self.axis) if self.rank != self.size - 1: if cells_front > self.local_shape[self.axis]: raise ValueError(f"Local Shape at rank={self.rank} along axis={self.axis} " f"should be > {cells_front}: dim({self.axis}) " f"{self.local_shape[self.axis]} < {cells_front}; " f"to achieve this use NUM_PROCESSES <= " f"{max(1, self.global_shape[self.axis] // cells_front)}") self.base_comm.send(np.take(self.local_array, np.arange(-cells_front, 0), axis=self.axis), dest=self.rank + 1, tag=1) if cells_back is not None: total_cells_back = self.base_comm.allgather(cells_back) + [0] # Read cells_back which needs to be sent to rank - 1(cells_back for rank - 1) cells_back = total_cells_back[self.rank - 1] if self.rank != 0: if cells_back > self.local_shape[self.axis]: raise ValueError(f"Local Shape at rank={self.rank} along axis={self.axis} " f"should be > {cells_back}: dim({self.axis}) " f"{self.local_shape[self.axis]} < {cells_back}; " f"to achieve this use NUM_PROCESSES <= " f"{max(1, self.global_shape[self.axis] // cells_back)}") self.base_comm.send(np.take(self.local_array, np.arange(cells_back), axis=self.axis), dest=self.rank - 1, tag=0) if self.rank != self.size - 1: ghosted_array = np.append(ghosted_array, self.base_comm.recv(source=self.rank + 1, tag=0), axis=self.axis) return ghosted_array def __repr__(self): return f"<DistributedArray with global shape={self.global_shape}, " \ f"local shape={self.local_shape}" \ f", dtype={self.dtype}, " \ f"processes={[i for i in range(self.size)]})> "
[docs] class StackedDistributedArray: r"""Stacked DistributedArrays Stack DistributedArray objects and power them with basic mathematical operations. This class allows one to work with a series of distributed arrays to avoid having to create a single distributed array with some special internal sorting. Parameters ---------- distarrays : :obj:`list` List of :class:`pylops_mpi.DistributedArray` objects. base_comm : :obj:`mpi4py.MPI.Comm`, optional Base MPI Communicator. Defaults to ``mpi4py.MPI.COMM_WORLD``. """ def __init__(self, distarrays: List, base_comm: MPI.Comm = MPI.COMM_WORLD): self.distarrays = distarrays self.narrays = len(distarrays) self.base_comm = base_comm self.rank = base_comm.Get_rank() self.size = base_comm.Get_size() def __getitem__(self, index): return self.distarrays[index] def __setitem__(self, index, value): self.distarrays[index][:] = value def asarray(self): """Global view of the array Gather all the distributed arrays Returns ------- final_array : :obj:`numpy.ndarray` Global Array gathered at all ranks """ return np.hstack([distarr.asarray().ravel() for distarr in self.distarrays]) def _check_stacked_size(self, stacked_array): """Check that arrays have consistent size """ if self.narrays != stacked_array.narrays: raise ValueError("Stacked arrays must be composed the same number of of distributed arrays") for iarr in range(self.narrays): if self.distarrays[iarr].global_shape != stacked_array[iarr].global_shape: raise ValueError(f"Stacked arrays {iarr} have different global shape:" f"{self.distarrays[iarr].global_shape} / " f"{stacked_array[iarr].global_shape}") def __neg__(self): arr = self.copy() for iarr in range(self.narrays): arr[iarr][:] = -arr[iarr][:] return arr def __add__(self, x): return self.add(x) def __iadd__(self, x): return self.iadd(x) def __sub__(self, x): return self.__add__(-x) def __isub__(self, x): return self.__iadd__(-x) def __mul__(self, x): return self.multiply(x) def __rmul__(self, x): return self.multiply(x) def add(self, stacked_array): """Stacked Distributed Addition of arrays """ self._check_stacked_size(stacked_array) SumArray = self.copy() for iarr in range(self.narrays): SumArray[iarr][:] = (self[iarr] + stacked_array[iarr])[:] return SumArray def iadd(self, stacked_array): """Stacked Distributed In-Place Addition of arrays """ self._check_stacked_size(stacked_array) for iarr in range(self.narrays): self[iarr][:] = (self[iarr] + stacked_array[iarr])[:] return self def multiply(self, stacked_array): """Stacked Distributed Multiplication of arrays """ if isinstance(stacked_array, StackedDistributedArray): self._check_stacked_size(stacked_array) ProductArray = self.copy() if isinstance(stacked_array, StackedDistributedArray): # multiply two DistributedArray for iarr in range(self.narrays): ProductArray[iarr][:] = (self[iarr] * stacked_array[iarr])[:] else: # multiply with scalar for iarr in range(self.narrays): ProductArray[iarr][:] = (self[iarr] * stacked_array)[:] return ProductArray def dot(self, stacked_array): """Dot Product of Stacked Distributed Arrays """ self._check_stacked_size(stacked_array) dotprod = 0. for iarr in range(self.narrays): dotprod += self[iarr].dot(stacked_array[iarr]) return dotprod def norm(self, ord: Optional[int] = None): """numpy.linalg.norm method on stacked Distributed arrays Parameters ---------- ord : :obj:`int`, optional Order of the norm. """ norms = np.array([distarray.norm(ord) for distarray in self.distarrays]) ord = 2 if ord is None else ord if ord in ['fro', 'nuc']: raise ValueError(f"norm-{ord} not possible for vectors") elif ord == 0: # Count non-zero then sum reduction norm = np.sum(norms) elif ord == np.inf: # Calculate max followed by max reduction norm = np.max(norms) elif ord == -np.inf: # Calculate min followed by max reduction norm = np.min(norms) else: norm = np.power(np.sum(np.power(norms, ord)), 1. / ord) return norm def conj(self): """Distributed conj() method """ ConjArray = StackedDistributedArray([distarray.conj() for distarray in self.distarrays]) return ConjArray def copy(self): """Creates a copy of the DistributedArray """ arr = StackedDistributedArray([distarray.copy() for distarray in self.distarrays]) return arr def __repr__(self): repr_dist = "\n".join([distarray.__repr__() for distarray in self.distarrays]) return f"<StackedDistributedArray with {self.narrays} distributed arrays: \n" + repr_dist