python多进程与多线程的实现,进程同步,多进程共享变量与内存。
并发和并行的区别:
它们虽然都说是”多个进程同时运行”,但是它们的”同时”不是一个概念。并行的”同时”是同一时刻可以多个进程在运行(处于running),并发的”同时”是经过上下文快速切换,使得看上去多个进程同时都在运行的现象,是一种OS欺骗用户的现象
。
实际上,
当程序中写下多进程或多线程代码时,这意味着的是并发而不是并行
。并发是因为多进程/多线程都是需要去完成的任务,不并行是因为并行与否由操作系统的调度器决定,可能会让多个进程/线程被调度到同一个CPU核心上。只不过调度算法会尽量让不同进程/线程使用不同的CPU核心,所以在实际使用中几乎总是会并行,但却不能以100%的角度去保证会并行。也就是说,
并行与否程序员无法控制,只能让操作系统决定
。
python中由于GIL使得一个进程中的线程只能并发无法并行,因此,如果多线程的进程是
CPU密集型
的,那多线程并不能有多少效率上的提升,相反还可能会因为线程的频繁切换,导致效率下降,推荐使用多进程;如果是
IO密集型
,多线程进程可以利用IO阻塞等待时的空闲时间执行其他线程,提升效率。
多进程
from multiprocessing import Process
import multiprocessing
import time
def task1(msg):
print('task1: hello, %s' % msg)
time.sleep(1)
def task2(msg):
print('task2: hello, %s' % msg)
time.sleep(1)
def task3(msg):
print('task3: hello, %s' % msg)
time.sleep(1)
if __name__ == '__main__':
p1 = Process(target=task1, args=('one',))
p2 = Process(target=task2, args=('two',))
p3 = Process(target=task3, args=('three',))
start = time.time()
p1.start()
p2.start()
p3.start()
print("The number of CPU is:" + str(multiprocessing.cpu_count()))
for p in multiprocessing.active_children():
print("child p.name: " + p.name + "\tp.id: " + str(p.pid))
p1.join()
p2.join()
p3.join()
end = time.time()
print('3 processes take %s seconds' % (end - start))
使用进程池
from multiprocessing import Pool
import multiprocessing
import time
def task(n):
print('task %d: hello, %d' % (n,n))
time.sleep(1)
if __name__ == '__main__':
pool = Pool(processes=4)
start = time.time()
for i in range(7):
pool.apply_async(task, args=(i,))
pool.close()
print("The number of CPU is:" + str(multiprocessing.cpu_count()))
for p in multiprocessing.active_children():
print("child p.name: " + p.name + "\tp.id: " + str(p.pid))
pool.join()
end = time.time()
print('7 processes take %s seconds' % (end - start))
pool中的map方法
from multiprocessing import Pool
import multiprocessing
import time
def task(n):
print('task %d: hello, %d' % (n,n))
time.sleep(1)
if __name__ == '__main__':
pool = Pool(processes=4)
start = time.time()
pool.map(task,range(7))
pool.close()
print("The number of CPU is:" + str(multiprocessing.cpu_count()))
for p in multiprocessing.active_children():
print("child p.name: " + p.name + "\tp.id: " + str(p.pid))
pool.join()
end = time.time()
print('7 processes take %s seconds' % (end - start))
在类中使用多进程
多进程不能使用类成员函数作为参数,因此需要定义一个辅助的外部函数
from multiprocessing import Pool
def func(client, x):
return client.f(x)
class SomeClass(object):
def f(self, x):
return x * x
def go(self, ):
result = list()
pool = Pool(processes=4)
for i in range(10):
result.append(pool.apply_async(func, [self, i]))
pool.close()
pool.join()
for res in result:
print(res.get(timeout=1))
a = SomeClass()
a.go()
多进程同步
使用临界区,临界区变量在主进程中分配内存,并和所有子进程间共享内存空间
Python中进程间共享数据,处理基本的queue,pipe和value,array外,还提供了更高层次的封装。使用multiprocessing.Manager可以简单地使用这些高级接口。
Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。
Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
建议都使用multiprocessing.Manager来统一管理
1. multiprocessing中的Lock,value,array的用法
from multiprocessing import Process, Lock, Value
def func(lock, x):
with lock:
print(x.value)
x.value += 1
if __name__ == "__main__":
lock = Lock()
x = Value('d',0)
a = []
for i in range(20):
p = Process(target=func, args=(lock, x,))
a.append(p)
p.start()
for i in a:
i.join()
print('end')
print(x.value)
无法直接在进程池中使用
,进程池中的临界区变量要用使用multiprocessing.Manager()中定义的,当然也有办法解决:在Pool中初始化Queue(将Queue绑定到一个函数中):
from multiprocessing import Pool, Queue
def f(msg):
f.q.put(msg)
def q_init(q):
f.q = q
if __name__ == "__main__":
q = Queue()
p = Pool(None, q_init, (q,))
for i in range(0, 8):
p.apply_async(f, (i,))
p.close()
p.join()
while not q.empty():
print(q.get())
2. 使用multiprocessing.Manager管理临界区
multiprocessing.Manager能直接在进程池中使用
每次创建Manager对象都会开启一个SyncManager进程。可创建一次Manager对象来统一管理所有的临界区变量
from multiprocessing import Pool, Manager
def func(lock, x):
with lock:
print(x.value)
x.value += 1
if __name__ == "__main__":
mgr = Manager()
lock = mgr.Lock()
x = mgr.Value('d',0)
a = []
pool = Pool(4)
for i in range(20):
p = pool.apply_async(func, args=(lock, x,))
pool.close()
pool.join()
print(x.value)
multiprocessing.Manager中 list,dict,array的使用
'''dict的用法'''
from multiprocessing import Manager,Pool
def task(d, key, value):
d[key] = value
if __name__ == '__main__':
d = Manager().dict()
pool = Pool(4)
for i in range(10):
pool.apply_async(task, (d, i, i * 2,))
pool.close()
pool.join()
print('Results:')
for key, value in d.items():
print("%s=%s" % (key, value))
'''Value'''
import ctypes
Manager().Value(int,11)
Manager().Value(string,'haha')
Manager().Value(ctypes.c_int,11)
Manager().Value(ctypes.c_double,11.2)
'''Array'''
arr = Manager().Array(ctypes.c_float, N * M)
arr[:, :] = np.zeros((N, M))
np.frombuffer(arr.get_obj(), np.float32)
使用multiprocessing.Manager创建自定义类的临界区管理进程(创建在临界区的对象,在其他进程中无法访问其属性)
from multiprocessing import Process
from multiprocessing.managers import BaseManager
class A:
def __init__(self, x):
self.x = x
def inc(self):
self.x += 1
print(self.x)
class MyManager(BaseManager):
pass
MyManager.register('A', A)
mgr = MyManager()
mgr.start()
def f(a):
a.inc()
if __name__ == '__main__':
a = mgr.A(5)
p = Process(target=f, args=(a,))
p.start()
p2 = Process(target=f, args=(a,))
p2.start()
a.inc()
p.join()
p2.join()
多线程
只能并发,不能并行
#两种创建方法
#1. 实例化一个threading.Thread的对象,并传入一个初始化函数对象(initial function )作为线程执行的入口;
import threading
import time
def tstart(arg):
time.sleep(0.5)
print("%s running...." % arg)
if __name__ == '__main__':
t1 = threading.Thread(target=tstart, args=('This is thread 1',))
t2 = threading.Thread(target=tstart, args=('This is thread 2',))
t1.start()
t2.start()
t1.join(timeout=0.3)
t2.join(timeout=0.3)
print("This is main function")
#2. 继承threading.Thread,并重写run函数;
import threading, time
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global n, lock
time.sleep(1)
lock.acquire():
print(n, self.name)
n += 1
lock.release()
if "__main__" == __name__:
n = 1
ThreadList = []
lock = threading.Lock()
for i in range(1, 200):
t = MyThread()
ThreadList.append(t)
for t in ThreadList:
t.start()
for t in ThreadList:
t.join()
#创建一个自己的类,实现同threading.Thread一样的功能
class MyThread(threading.Thread):
def __init__(self,target,args=()):
super(MyThread,self).__init__()
self.func = target
self.args = args
self.result = None
def run(self):
self.result = self.func(*self.args)
def func(key,values):
pass
t = MyThread(target=func,args=(key,values))
t.start()
t.join(timeout=0.3)