Python 实现并行、多输入任务
一、问题描述
- 处理上万条数据
- 数据处理任务有多个输入
二、解决办法
使用
Pool.starmap_async() 或 Pool.starmap()
,可以根据需求选择。
1. Pool.starmap_async() 和 Pool.starmap()的功能
- 创建一个进程池,进程池中有多个进程,多个进程可以并行执行任务,以缩短任务处理时间。
2. Pool.starmap()与Pool.starmap_async()的区别
-
相同点
- starmap_async()和starmap()都可以用来发出带有多个参数调用进程池中的函数的任务。(Both the starmap_async() and starmap() may be used to issue tasks that call a function in the process pool with more than one argument.)
-
区别
- starmap_async()函数不会阻塞,而starmap()函数会阻塞。
- starmap_async()函数返回AsyncResult,而starmap()函数返回目标函数返回值的可迭代对象
- starmap_async()函数可以对返回值和错误执行回调函数,而starmap()函数不支持回调函数。
三、实例Pool.starmap()与Pool.starmap_async()在进程池中的使用方法
1. 定义任务处理函数
from random import random
from time import sleep
from multiprocessing.pool import Pool
import multiprocessing
import time, datetime
# task executed in a worker process
def task(x, y):
t1 = str(datetime.datetime.now())[11:]
z = x*y
sleep(1) # block for a moment
t2 = str(datetime.datetime.now())[11:]
# report a message
identifier = multiprocessing.current_process().name
print(f'Task {identifier}, start time: {t1}, end time: {t2}', flush=True)
# return the generated value
return z
2. Pool.starmap_async()定义和配置进程池,并行执行任务
# create and configure the process pool
with Pool() as pool:
# prepare arguments
items = [(i, random()) for i in range(10)]
# issues tasks to process pool
result = pool.starmap_async(task, items)
# iterate results
for result in result.get():
print(f'Got result: {result}', flush=True)
# process pool is closed automatically
3. Pool.starmap()定义和配置进程池,并行执行任务
# create and configure the process pool
with Pool() as pool:
# prepare arguments
items = [(i, random()) for i in range(10)]
# issues tasks to process pool
result = pool.starmap_async(task, items)
print(result)
# process pool is closed automatically
4. 测试,并分析结果
-
串行 运行任务测试
if __name__ == '__main__': t1 = time.time() items = [(i, random()) for i in range(10)] for item in items: z = task(item[0], item[1]) t2 = time.time() print(f'Done. ({(1E3 * (t2 - t1)):.1f}ms) Process.')
运行时间
time=10014.4 ms
,结果如下:Task MainProcess, start time: 13:25:41.896662, end time: 13:25:42.897968 Task MainProcess, start time: 13:25:42.898297, end time: 13:25:43.899823 Task MainProcess, start time: 13:25:43.900057, end time: 13:25:44.900915 Task MainProcess, start time: 13:25:44.901269, end time: 13:25:45.902550 Task MainProcess, start time: 13:25:45.902764, end time: 13:25:46.904421 Task MainProcess, start time: 13:25:46.904628, end time: 13:25:47.905779 Task MainProcess, start time: 13:25:47.905974, end time: 13:25:48.906658 Task MainProcess, start time: 13:25:48.906912, end time: 13:25:49.908484 Task MainProcess, start time: 13:25:49.908724, end time: 13:25:50.910294 Task MainProcess, start time: 13:25:50.910528, end time: 13:25:51.910837 Done. (10014.4ms) Process.
-
并行 运行任务测试
-
Pool.starmap_async() 设置 4个 进程并行
# protect the entry point if __name__ == '__main__': t1 = time.time() # create and configure the process pool with Pool(processes=4) as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to process pool result = pool.starmap_async(task, items) # iterate results for result in result.get(): print(f'Got result: {result}', flush=True) # process pool is closed automatically t2 = time.time() print(f'Done. ({(1E3 * (t2 - t1)):.1f}ms) Process.')
运行时间 time=3027.1ms
,结果如下:Task ForkPoolWorker-4, start time: 13:33:24.838418, end time: 13:33:25.839684 Task ForkPoolWorker-3, start time: 13:33:24.838547, end time: 13:33:25.839684 Task ForkPoolWorker-2, start time: 13:33:24.838292, end time: 13:33:25.839684 Task ForkPoolWorker-1, start time: 13:33:24.838231, end time: 13:33:25.839684 Task ForkPoolWorker-3, start time: 13:33:25.840190, end time: 13:33:26.841160 Task ForkPoolWorker-2, start time: 13:33:25.840530, end time: 13:33:26.841349 Task ForkPoolWorker-1, start time: 13:33:25.840703, end time: 13:33:26.841380 Task ForkPoolWorker-4, start time: 13:33:25.840296, end time: 13:33:26.841381 Task ForkPoolWorker-3, start time: 13:33:26.841865, end time: 13:33:27.842449 Task ForkPoolWorker-2, start time: 13:33:26.841977, end time: 13:33:27.843139 Got result: 0.0 Got result: 0.07020127371886742 Got result: 1.1256784597006964 Got result: 1.5461355349791022 Got result: 0.8560855815072403 Got result: 3.882912015620671 Got result: 1.7547963496144667 Got result: 1.0103926687967728 Got result: 4.760893202587357 Got result: 4.2627716915139695 Done. (3027.1ms) Process.
-
Pool.starmap() 设置 4个 进程并行
# protect the entry point if __name__ == '__main__': t1 = time.time() # create and configure the process pool with Pool(processes=4) as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to process pool result = pool.starmap(task, items) print(result) # process pool is closed automatically t2 = time.time() print(f'Done. ({(1E3 * (t2 - t1)):.1f}ms) Process.')
运行时间 time=3026.7ms
,结果如下:Task ForkPoolWorker-1, start time: 13:41:59.417044, end time: 13:42:00.418067 Task ForkPoolWorker-4, start time: 13:41:59.417248, end time: 13:42:00.418067 Task ForkPoolWorker-2, start time: 13:41:59.417206, end time: 13:42:00.418067 Task ForkPoolWorker-3, start time: 13:41:59.417213, end time: 13:42:00.418067 Task ForkPoolWorker-4, start time: 13:42:00.418833, end time: 13:42:01.419987 Task ForkPoolWorker-2, start time: 13:42:00.419155, end time: 13:42:01.420250 Task ForkPoolWorker-1, start time: 13:42:00.419356, end time: 13:42:01.420633 Task ForkPoolWorker-3, start time: 13:42:00.419487, end time: 13:42:01.420784 Task ForkPoolWorker-4, start time: 13:42:01.420545, end time: 13:42:02.421670 Task ForkPoolWorker-2, start time: 13:42:01.421409, end time: 13:42:02.422508 [0.0, 0.22916987317496185, 1.48514094188553, 2.6743708768075924, 1.5402186086037761, 4.265952360897589, 0.8526580381048074, 3.806662622060956, 0.8377934521648509, 6.092057694481896] Done. (3026.7ms) Process.
-
版权声明:本文为qq_40541102原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。