python多进程与多线程,进程同步,共享内存

  • Post author:
  • Post category:python


python多进程与多线程的实现,进程同步,多进程共享变量与内存。



并发和并行的区别:


它们虽然都说是”多个进程同时运行”,但是它们的”同时”不是一个概念。并行的”同时”是同一时刻可以多个进程在运行(处于running),并发的”同时”是经过上下文快速切换,使得看上去多个进程同时都在运行的现象,是一种OS欺骗用户的现象





实际上,

当程序中写下多进程或多线程代码时,这意味着的是并发而不是并行

。并发是因为多进程/多线程都是需要去完成的任务,不并行是因为并行与否由操作系统的调度器决定,可能会让多个进程/线程被调度到同一个CPU核心上。只不过调度算法会尽量让不同进程/线程使用不同的CPU核心,所以在实际使用中几乎总是会并行,但却不能以100%的角度去保证会并行。也就是说,

并行与否程序员无法控制,只能让操作系统决定




python中由于GIL使得一个进程中的线程只能并发无法并行,因此,如果多线程的进程是

CPU密集型

的,那多线程并不能有多少效率上的提升,相反还可能会因为线程的频繁切换,导致效率下降,推荐使用多进程;如果是

IO密集型

,多线程进程可以利用IO阻塞等待时的空闲时间执行其他线程,提升效率。





多进程




from multiprocessing import Process
import multiprocessing
import time

def task1(msg):
    print('task1: hello, %s' % msg)
    time.sleep(1)

def task2(msg):
    print('task2: hello, %s' % msg)
    time.sleep(1)

def task3(msg):
    print('task3: hello, %s' % msg)
    time.sleep(1)

if __name__ == '__main__':
    p1 = Process(target=task1, args=('one',))
    p2 = Process(target=task2, args=('two',))
    p3 = Process(target=task3, args=('three',))

    start = time.time()

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child p.name: " + p.name + "\tp.id: " + str(p.pid))

    p1.join()
    p2.join()
    p3.join()

    end = time.time()
    print('3 processes take %s seconds' % (end - start))

使用进程池

from multiprocessing import Pool
import multiprocessing
import time

def task(n):
    print('task %d: hello, %d' % (n,n))
    time.sleep(1)

if __name__ == '__main__':
    pool = Pool(processes=4)
    start = time.time()

    for i in range(7):
        pool.apply_async(task, args=(i,))
    pool.close()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child p.name: " + p.name + "\tp.id: " + str(p.pid))

    pool.join()
    end = time.time()
    print('7 processes take %s seconds' % (end - start))

pool中的map方法

from multiprocessing import Pool
import multiprocessing
import time

def task(n):
    print('task %d: hello, %d' % (n,n))
    time.sleep(1)

if __name__ == '__main__':
    pool = Pool(processes=4)
    start = time.time()

    pool.map(task,range(7))

    pool.close()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child p.name: " + p.name + "\tp.id: " + str(p.pid))

    pool.join()
    end = time.time()
    print('7 processes take %s seconds' % (end - start))




在类中使用多进程




多进程不能使用类成员函数作为参数,因此需要定义一个辅助的外部函数

from multiprocessing import Pool

def func(client, x):
    return client.f(x)

class SomeClass(object):
    def f(self, x):
        return x * x

    def go(self, ):
        result = list()
        pool = Pool(processes=4)
        for i in range(10):
            result.append(pool.apply_async(func, [self, i]))
        pool.close()
        pool.join()
        for res in result:
            print(res.get(timeout=1))

a = SomeClass()
a.go()




多进程同步




使用临界区,临界区变量在主进程中分配内存,并和所有子进程间共享内存空间

Python中进程间共享数据,处理基本的queue,pipe和value,array外,还提供了更高层次的封装。使用multiprocessing.Manager可以简单地使用这些高级接口。

Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。

Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。


建议都使用multiprocessing.Manager来统一管理

1. multiprocessing中的Lock,value,array的用法

from multiprocessing import Process, Lock, Value

def func(lock, x):
    with lock:
        print(x.value)
        x.value += 1

if __name__ == "__main__":
    lock = Lock()
    x = Value('d',0)

    a = []
    for i in range(20):
        p = Process(target=func, args=(lock, x,))
        a.append(p)
        p.start()

    for i in a:
        i.join()

    print('end')
    print(x.value)

multiprocessing中的Lock,value,array,Queue

无法直接在进程池中使用

,进程池中的临界区变量要用使用multiprocessing.Manager()中定义的,当然也有办法解决:在Pool中初始化Queue(将Queue绑定到一个函数中):
from multiprocessing import Pool, Queue

def f(msg):
    f.q.put(msg)

def q_init(q):
    f.q = q

if __name__ == "__main__":
    q = Queue()
    p = Pool(None, q_init, (q,))
    for i in range(0, 8):
        p.apply_async(f, (i,))
    p.close()
    p.join()
    while not q.empty():
        print(q.get())

2. 使用multiprocessing.Manager管理临界区

multiprocessing.Manager能直接在进程池中使用


每次创建Manager对象都会开启一个SyncManager进程。可创建一次Manager对象来统一管理所有的临界区变量

from multiprocessing import Pool, Manager

def func(lock, x):
    with lock:
        print(x.value)
        x.value += 1

if __name__ == "__main__":
    mgr = Manager()
    lock = mgr.Lock()
    x = mgr.Value('d',0)

    a = []
    pool = Pool(4)
    for i in range(20):
        p = pool.apply_async(func, args=(lock, x,))

    pool.close()
    pool.join()

    print(x.value)

multiprocessing.Manager中 list,dict,array的使用

'''dict的用法'''
from multiprocessing import Manager,Pool

def task(d, key, value):
    d[key] = value

if __name__ == '__main__':
    d = Manager().dict()
    pool = Pool(4)
    for i in range(10):
        pool.apply_async(task, (d, i, i * 2,))
    pool.close()
    pool.join()
    print('Results:')
    for key, value in d.items():
        print("%s=%s" % (key, value))
        
'''Value'''
import ctypes
Manager().Value(int,11)
Manager().Value(string,'haha')
Manager().Value(ctypes.c_int,11)
Manager().Value(ctypes.c_double,11.2)
        
'''Array'''
arr = Manager().Array(ctypes.c_float, N * M)
arr[:, :] = np.zeros((N, M))
np.frombuffer(arr.get_obj(), np.float32)

使用multiprocessing.Manager创建自定义类的临界区管理进程(创建在临界区的对象,在其他进程中无法访问其属性)

from multiprocessing import Process
from multiprocessing.managers import BaseManager

class A:
    def __init__(self, x):
        self.x = x

    def inc(self):
        self.x += 1
        print(self.x)


class MyManager(BaseManager):
    pass
MyManager.register('A', A)
mgr = MyManager()
mgr.start()

def f(a):
    a.inc()


if __name__ == '__main__':
    a = mgr.A(5)

    p = Process(target=f, args=(a,))
    p.start()
    p2 = Process(target=f, args=(a,))
    p2.start()
    a.inc()

    p.join()
    p2.join()

多线程

只能并发,不能并行

#两种创建方法
#1. 实例化一个threading.Thread的对象,并传入一个初始化函数对象(initial function )作为线程执行的入口;
import threading
import time

def tstart(arg):
    time.sleep(0.5)
    print("%s running...." % arg)

if __name__ == '__main__':
    t1 = threading.Thread(target=tstart, args=('This is thread 1',))
    t2 = threading.Thread(target=tstart, args=('This is thread 2',))
    t1.start()
    t2.start()
    t1.join(timeout=0.3)
    t2.join(timeout=0.3)
    print("This is main function")



#2. 继承threading.Thread,并重写run函数;
import threading, time

class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        global n, lock
        time.sleep(1)
        lock.acquire():
        print(n, self.name)
        n += 1
        lock.release()

if "__main__" == __name__:
    n = 1
    ThreadList = []
    lock = threading.Lock()
    for i in range(1, 200):
        t = MyThread()
        ThreadList.append(t)
    for t in ThreadList:
        t.start()
    for t in ThreadList:
        t.join()
        
        

#创建一个自己的类,实现同threading.Thread一样的功能
class MyThread(threading.Thread):
    def __init__(self,target,args=()):
        super(MyThread,self).__init__()
        self.func = target
        self.args = args
        self.result = None

    def run(self):
        self.result = self.func(*self.args)

            
 def func(key,values):
 	pass
            
t = MyThread(target=func,args=(key,values))
t.start()
t.join(timeout=0.3)



版权声明:本文为qq_33360393原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。