from concurrent.futures import ProcessPoolExecutor
import dascore as dc
def my_patch_processing_function(patch):
"""A custom function for processing patches."""
...
= dc.get_example_spool("random_das")
spool
= ProcessPoolExecutor()
executor
map(my_patch_processing_function, client=executor) spool.
Parallel Processing
This recipe shows a few strategies to parallelize “embarrassingly parallel” spool processing workflows.
Processes and Threads
dascore.Spool.map is the easiest way to process patches in a spool in parallel. Here is an example using the Python standard library module concurrent.futures:
The ThreadPoolExecutor
from the same module will also work, but due to python’s GIL may not provide much of a speed-up.
There are two downsides to this approach. First, if the patches aren’t chunked adequately it may exhaust the available memory. Second, it will only work on a single machine. The next section presents a more scalable option.
MPI4Py
This section shows how to use the “mpi4py” library to parallelize dascore code.
Installation
First, make sure you have installed DASCore on your machine. See DASCore Installation. Secondly, you need to properly install the mpi4py library. After installing and loading the Open MPI module on your machine (e.g., on Linux: load module to/mpi/openmpi/gcc/compiler/path
), install mpi4py. It might be easier to install using conda-forge as below:
conda install -c conda-forge mpi4py openmpi
Please note that this procedure is tested for Python 3.11 and Open MPI GCC 3.1.3
Parallel script
Here is an example for parallelization of Patches over processors:
mpi_spool.py
#| execute: false
import sys
import dascore as dc
from mpi4py import MPI
# Load the spool
= dc.get_example_spool("random_das")
spool
# Initiate MPI
= MPI.COMM_WORLD
comm = comm.Get_rank()
rank = comm.Get_size()
size
# Check the spool on the first processor
if len(spool)<1:
if rank==0:
raise ValueError('No Patch of data found within the spool.')
else:
1)
sys.exit(
for i in range(rank, len(spool), size):
= spool[i]
patch print(f"rank: {rank}, patch number: {i}, patch: {patch}")
...
comm.barrier()0) sys.exit(
Run the script
If you like to run the mpi_spool.py
script using n = 4
processors (which means each processor will run the script separately), you can use:
mpiexec -n 4 python mpi_spool.py
or
mpirun -n 4 python mpi_spool.py