Multiprocessing Pool.starmap_async() and Pool.starmap() in Python 实现并行、多输入任务

  • Post author:
  • Post category:python




一、问题描述

  • 处理上万条数据
  • 数据处理任务有多个输入



二、解决办法

使用

Pool.starmap_async() 或 Pool.starmap()

,可以根据需求选择。



1. Pool.starmap_async() 和 Pool.starmap()的功能

  • 创建一个进程池,进程池中有多个进程,多个进程可以并行执行任务,以缩短任务处理时间。



2. Pool.starmap()与Pool.starmap_async()的区别

  • 相同点

    • starmap_async()和starmap()都可以用来发出带有多个参数调用进程池中的函数的任务。(Both the starmap_async() and starmap() may be used to issue tasks that call a function in the process pool with more than one argument.)
  • 区别

    • starmap_async()函数不会阻塞,而starmap()函数会阻塞。
    • starmap_async()函数返回AsyncResult,而starmap()函数返回目标函数返回值的可迭代对象
    • starmap_async()函数可以对返回值和错误执行回调函数,而starmap()函数不支持回调函数。



三、实例Pool.starmap()与Pool.starmap_async()在进程池中的使用方法



1. 定义任务处理函数

from random import random
from time import sleep
from multiprocessing.pool import Pool
import multiprocessing
import time, datetime
    
# task executed in a worker process
def task(x, y):
    t1 = str(datetime.datetime.now())[11:]
    z = x*y
    sleep(1)   # block for a moment
    t2 = str(datetime.datetime.now())[11:]
    # report a message
    identifier = multiprocessing.current_process().name
    print(f'Task {identifier}, start time: {t1}, end time: {t2}', flush=True)
    # return the generated value
    return z



2. Pool.starmap_async()定义和配置进程池,并行执行任务

# create and configure the process pool
with Pool() as pool:
    # prepare arguments
    items = [(i, random()) for i in range(10)]
    # issues tasks to process pool
    result = pool.starmap_async(task, items)
    # iterate results
    for result in result.get():
        print(f'Got result: {result}', flush=True)
# process pool is closed automatically



3. Pool.starmap()定义和配置进程池,并行执行任务

# create and configure the process pool
with Pool() as pool:
    # prepare arguments
    items = [(i, random()) for i in range(10)]
    # issues tasks to process pool
    result = pool.starmap_async(task, items)
    print(result)
# process pool is closed automatically



4. 测试,并分析结果

  • 串行 运行任务测试

    if __name__ == '__main__':
        t1 = time.time()
        items = [(i, random()) for i in range(10)]
        for item in items:
            z = task(item[0], item[1])
        t2 = time.time()
        print(f'Done. ({(1E3 * (t2 - t1)):.1f}ms) Process.')
    

    运行时间

    time=10014.4 ms

    ,结果如下:

    Task MainProcess, start time: 13:25:41.896662, end time: 13:25:42.897968
    Task MainProcess, start time: 13:25:42.898297, end time: 13:25:43.899823
    Task MainProcess, start time: 13:25:43.900057, end time: 13:25:44.900915
    Task MainProcess, start time: 13:25:44.901269, end time: 13:25:45.902550
    Task MainProcess, start time: 13:25:45.902764, end time: 13:25:46.904421
    Task MainProcess, start time: 13:25:46.904628, end time: 13:25:47.905779
    Task MainProcess, start time: 13:25:47.905974, end time: 13:25:48.906658
    Task MainProcess, start time: 13:25:48.906912, end time: 13:25:49.908484
    Task MainProcess, start time: 13:25:49.908724, end time: 13:25:50.910294
    Task MainProcess, start time: 13:25:50.910528, end time: 13:25:51.910837
    Done. (10014.4ms) Process.
    
  • 并行 运行任务测试

    • Pool.starmap_async() 设置 4个 进程并行

      # protect the entry point
      if __name__ == '__main__':
          t1 = time.time()
          # create and configure the process pool
          with Pool(processes=4) as pool:
              # prepare arguments
              items = [(i, random()) for i in range(10)]
              # issues tasks to process pool
              result = pool.starmap_async(task, items)
              # iterate results
              for result in result.get():
                  print(f'Got result: {result}', flush=True)      
          # process pool is closed automatically
          t2 = time.time()
          print(f'Done. ({(1E3 * (t2 - t1)):.1f}ms) Process.')
      


      运行时间 time=3027.1ms

      ,结果如下:

      Task ForkPoolWorker-4, start time: 13:33:24.838418, end time: 13:33:25.839684
      Task ForkPoolWorker-3, start time: 13:33:24.838547, end time: 13:33:25.839684
      Task ForkPoolWorker-2, start time: 13:33:24.838292, end time: 13:33:25.839684
      Task ForkPoolWorker-1, start time: 13:33:24.838231, end time: 13:33:25.839684
      Task ForkPoolWorker-3, start time: 13:33:25.840190, end time: 13:33:26.841160
      Task ForkPoolWorker-2, start time: 13:33:25.840530, end time: 13:33:26.841349
      Task ForkPoolWorker-1, start time: 13:33:25.840703, end time: 13:33:26.841380
      Task ForkPoolWorker-4, start time: 13:33:25.840296, end time: 13:33:26.841381
      Task ForkPoolWorker-3, start time: 13:33:26.841865, end time: 13:33:27.842449
      Task ForkPoolWorker-2, start time: 13:33:26.841977, end time: 13:33:27.843139
      Got result: 0.0
      Got result: 0.07020127371886742
      Got result: 1.1256784597006964
      Got result: 1.5461355349791022
      Got result: 0.8560855815072403
      Got result: 3.882912015620671
      Got result: 1.7547963496144667
      Got result: 1.0103926687967728
      Got result: 4.760893202587357
      Got result: 4.2627716915139695
      Done. (3027.1ms) Process.
      
    • Pool.starmap() 设置 4个 进程并行

        # protect the entry point
        if __name__ == '__main__':
            t1 = time.time()
            # create and configure the process pool
            with Pool(processes=4) as pool:
                # prepare arguments
                items = [(i, random()) for i in range(10)]
                # issues tasks to process pool
                result = pool.starmap(task, items)
                print(result)  
            # process pool is closed automatically
        t2 = time.time()
        print(f'Done. ({(1E3 * (t2 - t1)):.1f}ms) Process.')
      


      运行时间 time=3026.7ms

      ,结果如下:

      Task ForkPoolWorker-1, start time: 13:41:59.417044, end time: 13:42:00.418067
      Task ForkPoolWorker-4, start time: 13:41:59.417248, end time: 13:42:00.418067
      Task ForkPoolWorker-2, start time: 13:41:59.417206, end time: 13:42:00.418067
      Task ForkPoolWorker-3, start time: 13:41:59.417213, end time: 13:42:00.418067
      Task ForkPoolWorker-4, start time: 13:42:00.418833, end time: 13:42:01.419987
      Task ForkPoolWorker-2, start time: 13:42:00.419155, end time: 13:42:01.420250
      Task ForkPoolWorker-1, start time: 13:42:00.419356, end time: 13:42:01.420633
      Task ForkPoolWorker-3, start time: 13:42:00.419487, end time: 13:42:01.420784
      Task ForkPoolWorker-4, start time: 13:42:01.420545, end time: 13:42:02.421670
      Task ForkPoolWorker-2, start time: 13:42:01.421409, end time: 13:42:02.422508
      [0.0, 0.22916987317496185, 1.48514094188553, 2.6743708768075924, 1.5402186086037761, 4.265952360897589, 0.8526580381048074, 3.806662622060956, 0.8377934521648509, 6.092057694481896]
      Done. (3026.7ms) Process.
      
  • 更多详细描述见

    Multiprocessing Pool.starmap_async() in Python



版权声明:本文为qq_40541102原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。