python doc2 —— MPI多线程并行计算工具mpi4py

  • Post author:
  • Post category:python




1. MPI

MPI的全称是Message Passing Interface,即消息传递接口。

  • 它并不是一门语言,而是一个库,我们可以用Fortran、C、C++结合MPI提供的接口来将串行的程序进行并行化处理,也可以认为Fortran+MPI或者C+MPI是一种再原来串行语言的基础上扩展出来的并行语言。
  • 它是一种标准而不是特定的实现,具体的可以有很多不同的实现,例如MPICH、OpenMPI等。
  • 它是一种消息传递编程模型,顾名思义,它就是专门服务于进程间通信的。

MPI的工作方式很好理解,我们可以同时启动一组进程,在同一个通信域中不同的进程都有不同的编号,程序员可以利用MPI提供的接口来给不同编号的进程分配不同的任务和帮助进程相互交流最终完成同一个任务。就好比包工头给工人们编上了工号然后指定一个方案来给不同编号的工人分配任务并让工人相互沟通完成任务。



2. 基本MPI函数

mpi4py是一个构建在MPI之上的Python库,主要使用Cython编写。mpi4py使得Python的数据结构可以方便的在多进程中传递。

mpi4py是一个很强大的库,它实现了很多MPI标准中的接口,包括点对点通信,组内集合通信、非阻塞通信、重复非阻塞通信、组间通信等,基本上我能想到用到的MPI接口mpi4py中都有相应的实现。不仅是Python对象,mpi4py对numpy也有很好的支持并且传递效率很高。同时它还提供了SWIG和F2PY的接口能够让我们将自己的Fortran或者C/C++程序在封装成Python后仍然能够使用mpi4py的对象和接口来进行并行处理。可见mpi4py的作者的功力的确是非常了得。



2.1 工具



a. 通信子(通信空间)


MPI_COMM_WORLD

在MPI中的作用:

  • 一个通信空间是一个进程组和一个上下文的组合.上下文可看作为组的超级标签,用于区分不同的通信子.
  • 在执行函数MPI_Init之后,一个MPI程序的所有进程形成一个缺省的组,这个组的通信子即被写作MPI_COMM_WORLD.
  • 该参数是MPI通信操作函数中必不可少的参数,用于限定参加通信的进程的范围.
from mpi4py import MPI
                                                
MPI.COMM_SELF
# <mpi4py.MPI.Intracomm at 0x7f2fa2fd59d0>
                                                
MPI.COMM_WORLD
# <mpi4py.MPI.Intracomm at 0x7f2fa2fd59f0>



b. 获取进程

转到python中的mpi4py

from mpi4py import MPI
# 用Get_size 获得进程个数 p
size = MPI.COMM_WORLD.Get_size()
# Get_rank 获得进程的一个叫rank的值,
# 该rank值为0到p-1间的整数,相当于进程的ID号
rank = MPI.COMM_WORLD.Get_rank()

c++中的定义类似

int MPI_Comm_size(MPI_Comm comm, int *size)
int MPI_Comm_rank(MPI_Comm comm, int *rank)



3. 通信

因为mpi4py中点对点的 通信send语句,在数据量较小的时候是把发送数据拷贝到缓存区,是非堵塞的操作, 然而在数据量较大时候是堵塞操作。

阻塞操作见

python的MPI多线程并行通信方式



3.1 点对点通信

所谓点对点通信,即单个线程与单个线程之间通信。

(a) 两个线程之间点对点通信。

# test.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = {'a': 7, 'b': 3.14}
    comm.send(data, dest=1, tag=11)
    data['a'] = 8; data['b'] = data['b'] * 2

elif rank == 1:
    data = comm.recv(source=0, tag=11)
print('{}_{}'.format(rank, data))

结果为:

mpiexec -n 2 python test.py     # -n/-np  means number of process
# result
0_{'a': 8, 'b': 6.28}
1_{'a': 7, 'b': 3.14}

(b) 多个线程之间点对点通信

如下面例子的5个线程之间点对点通信,一个接着一个通信之间通信,形成一个圈(rank_3 => rank_4 => rank_0 => rank_1 => rank_2 => rank_3)。

# test.py
import mpi4py.MPI as MPI
 
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()      # id number
comm_size = comm.Get_size()      # number of mpi

# point to point communication
data_send = [comm_rank+1]*5

comm.send(data_send,dest=(comm_rank+1)%comm_size)
data_recv =comm.recv(source=(comm_rank-1)%comm_size)

print("my rank is %d, and I received:" % comm_rank)
print(data_recv)

运行该文件:

mpiexec -n 5 python test.py
# result
my rank is 3, and I received:
[3, 3, 3, 3, 3]
my rank is 0, and I received:
[5, 5, 5, 5, 5]
my rank is 4, and I received:
[4, 4, 4, 4, 4]
my rank is 1, and I received:
[1, 1, 1, 1, 1]
my rank is 2, and I received:
[2, 2, 2, 2, 2]


解释

:(rank_3 => rank_4 => rank_0 => rank_1 => rank_2 => rank_3)

例如,当前进程id=3,其data_send = [comm_rank+1]*5为[4, 4, 4, 4, 4], 而该进程发送时,所设定的dest=(comm_rank+1)%comm_size)=4,接收设定为source=3,因此rank=3原本的数据[4, 4, 4, 4, 4],发送到目标dest=4,所以rank=4所接收到的数据为[4, 4, 4, 4, 4],而rank=3自己接收的来源source=3,故dest计算等于3的为rank=2,所以id=3获取其数据[3, 3, 3, 3, 3],并打印出来。

如果修改到如下情况,则多线程之间可以按照顺序点对点的通信,即(rank_1 => rank_2 => rank_3 => rank_4 => rank_0 => rank_1)

import mpi4py.MPI as MPI
     
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
data_send = [comm_rank+1]*5

if comm_rank == 0:
   comm.send(data_send, dest=(comm_rank+1)%comm_size)

if comm_rank > 0:
   data_recv = comm.recv(source=(comm_rank-1)%comm_size)
   comm.send(data_send, dest=(comm_rank+1)%comm_size)

if comm_rank == 0:
   data_recv = comm.recv(source=(comm_rank-1)%comm_size)

print("my rank is %d, and Ireceived:" % comm_rank)
print(data_recv)



3.2 群体通信



a. 广播 bcast

广播操作是典型的一对多通信,将跟进程的数据复制到同组内其他所有进程中。

在这里插入图片描述

from mpi4py import MPI                                                     
                                                                           
comm = MPI.COMM_WORLD                                                      
rank = comm.Get_rank()                                                     
size = comm.Get_size()                                                     
                                                                           
if rank == 0:                                                              
    data = list(range(10))                                                       
    print("process {} bcast data {} to other processes".format(rank, data))
else:                                                                      
    data = None                                                            
data = comm.bcast(data, root=0)                                            
print("process {} recv data {}...".format(rank, data))
mpiexec -n 5 python test.py
# result
process 0 bcast data [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] to other processes
process 0 recv data [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]...
process 1 recv data [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]...
process 4 recv data [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]...
process 2 recv data [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]...
process 3 recv data [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]...



b. 发散 scatter

与广播不同,发散可以向不同的进程发送不同的数据,而不是完全复制。

在这里插入图片描述

from mpi4py import MPI                                                            
import numpy as np                                                                
                                                                                  
comm = MPI.COMM_WORLD                                                             
rank = comm.Get_rank()                                                            
size = comm.Get_size()                                                            
                                                                                  
recv_data = None                                                                  
                                                                                  
if rank == 0:                                                                     
    send_data = list(range(5))                                                         
    print("process {} scatter data {} to other processes".format(rank, send_data))
else:                                                                             
    send_data = None                                                              
recv_data = comm.scatter(send_data, root=0)                                       
print("process {} recv data {}...".format(rank, recv_data))
mpiexec -n 5 python test.py



c. 收集 gather

收集过程是发散过程的逆过程,每个进程将发送缓冲区的消息发送给根进程,根进程根据发送进程的进程号将各自的消息存放到自己的消息缓冲区中。

在这里插入图片描述

from mpi4py import MPI                                              
import numpy as np                                                  
                                                                    
comm = MPI.COMM_WORLD                                               
rank = comm.Get_rank()                                              
size = comm.Get_size()                                              
                                                                    
send_data = rank                                                    
print "process {} send data {} to root...".format(rank, send_data)  
recv_data = comm.gather(send_data, root=0)                          
if rank == 0:                                                       
    print "process {} gather all data {}...".format(rank, recv_data)
mpiexec -n 5 python test.py



d. 规约 reduce


参考资料:

[1] https://mpi4py.readthedocs.io/en/latest/tutorial.html

[2] https://mpitutorial.com/tutorials/mpi-introduction/

[3] https://zhuanlan.zhihu.com/p/25332041

[4] https://www.cnblogs.com/devilmaycry812839668/p/9484644.html

[5] https://blog.csdn.net/ZuoShifan/article/details/80024380

[6] https://www.zybuluo.com/Purpose/note/700033

[7] https://rabernat.github.io/research_computing/parallel-programming-with-mpi-for-python.html



版权声明:本文为zhangkzz原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。