Python多线程并发编程

  • Post author:
  • Post category:python




一、Python中的GIL

"""
GIL的全称global interpreter lock 意为全局解释器锁。
Python中的一个线程对应与c语言中的一个线程。
GIL使得同一时刻一个CPU只能有一个线程执行字节码, 无法将多个线程映射到多个CPU上执行。
GIL会根据执行的字节码行数以及时间释放GIL,GIL在遇到IO的操作时候会主动释放。
"""

# GIL会释放,释放的位置不定,最后的结果不定
import threading
total = 0


def add():
    global total
    for i in range(1, 1000000):
        total += 1


def sub():
    global total
    for i in range(1, 1000000):
        total -= 1


t1 = threading.Thread(target=add)
t2 = threading.Thread(target=sub)
t1.start()
t2.start()
t1.join()
t2.join()
print(total)

运行结果:

-296591



二、Python多线程编程



2.1、通过threading实现多线程

import threading
import time


def thread1():
    print('thread1:start_time:%s' % time.time())
    time.sleep(2)
    print('thread1:end_time:%s' % time.time())


def thread2():
    print('thread2:start_time:%s' % time.time())
    time.sleep(2)
    print('thread2:end_time:%s' % time.time())


if __name__ == '__main__':
    t1 = threading.Thread(target=thread1)
    t2 = threading.Thread(target=thread2)
    start_time = time.time()
    t1.start()
    t2.start()
    # 时间非常小,是运行代码的时间差,而不是2秒
    # 这样运行一共有三个线程,主线程和其它两个子线程(thread1、thread2),而且是并行的,子线程启动后,主线程仍然往下运行,因此时间不是2秒
    # 守护线程(主线程退出,子线程就会kill掉)
    print('end_time:{}'.format(time.time()-start_time))

运行结果:

thread1:start_time:1657777068.6530874
thread2:start_time:1657777068.6530874
end_time:0.0010001659393310547
thread1:end_time:1657777070.6686676
thread2:end_time:1657777070.6686676



2.2、守护线程

import time
import threading


def thread1(s):
    print('thread1:start_time:%s' % time.time())
    time.sleep(s)
    print('thread1:stop_time:%s' % time.time())


def thread2(s):
    print('thread2:start_time:%s' % time.time())
    time.sleep(s)
    print('thread2:stop_time:%s' % time.time())


if __name__ == '__main__':
    t1 = threading.Thread(target=thread1, args=(2,))
    t2 = threading.Thread(target=thread2, args=(1,))
    # 将线程t1设置成守护线程
    """当程序中非守护线程全部运行完,只有守护线程时,守护线程没有了守护对象,守护线程就会被kill掉。"""
    t1.setDaemon(True)
    start_time = time.time()
    t1.start()
    t2.start()
    print('end_time:{}'.format(time.time() - start_time))

运行结果:

thread1:start_time:1657777183.1204586
thread2:start_time:1657777183.121459
end_time:0.0010004043579101562
thread2:stop_time:1657777184.1246376



2.3、等某个子线程执行完再继续执行主线程(join)

import time
import threading
from threading import current_thread


def thread1(s):
    print('%s:start_time:%s' % (current_thread().getName(), time.time()))
    time.sleep(s)
    print('%s:end_time:%s' % (current_thread().getName(), time.time()))


def thread2(s):
    print('%s:start_time:%s' % (current_thread().getName(), time.time()))
    time.sleep(s)
    print('%s:end_time:%s' % (current_thread().getName(), time.time()))


if __name__ == '__main__':
    t1 = threading.Thread(target=thread1, args=(2,))
    t2 = threading.Thread(target=thread2, args=(1,))
    t1.setName('thread1')
    t2.setName('thread2')
    start_time = time.time()
    t1.start()
    # 等待线程t1执行完
    t1.join()
    t2.start()
    # 等待线程t2执行完
    t2.join()
    print('end_time:{}'.format(time.time() - start_time))

运行结果:

thread1:start_time:1657777605.7998574
thread1:end_time:1657777607.8096633
thread2:start_time:1657777607.8096633
thread2:end_time:1657777608.823355
end_time:3.024498701095581



2.4、继承Thread实现多线程

import time
import threading
from threading import current_thread


class Thread1(threading.Thread):
    def run(self):
        # 获取当前线程名称
        name1 = current_thread().getName()
        name2 = self.getName()
        name3 = self.name
        print('线程名1:', name1, name2, name3)
        print('%s:start_time:%s' % (self.name, time.time()))
        time.sleep(2)
        print('%s:end_time:%s' % (self.name, time.time()))


class Thread2(threading.Thread):
    def run(self):
        name1 = current_thread().getName()
        name2 = self.getName()
        name3 = self.name
        print('线程名2:', name1, name2, name3)
        print('%s:start_time:%s' % (self.name, time.time()))
        time.sleep(1)
        print('%s:end_time:%s' % (self.name, time.time()))


if __name__ == '__main__':
    thread1 = Thread1()
    thread2 = Thread2()
    # 设置线程名称
    thread1.setName('thread1')
    thread2.setName('thread2')
    start_time = time.time()
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    print('end_time:{}'.format(time.time() - start_time))

运行结果:

线程名1: thread1 thread1 thread1
thread1:start_time:1657777734.236593
线程名2: thread2 thread2 thread2
thread2:start_time:1657777734.2375908
thread2:end_time:1657777735.243757
thread1:end_time:1657777736.2522626
end_time:2.016672372817993



三、线程同步



3.1、线程锁机制(Lock)

import threading
from threading import Lock
total = 0
lock = Lock()


def add():
    global total
    for i in range(1, 1000000):
        # 获取锁
        lock.acquire()
        total += 1
        # 释放锁
        lock.release()


def sub():
    global total
    for i in range(1, 1000000):
        lock.acquire()
        total -= 1
        lock.release()


if __name__ == '__main__':
    t1 = threading.Thread(target=add)
    t2 = threading.Thread(target=sub)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(total)

运行结果:

0



3.2、通过Queue的方式进行线程同步


Queue中的方法:

  • qsize()查看队列大小。
  • empty()判断队列是否为空,若队列为空,get方法就会阻塞,等待有数据后get方法再从队列中取数据。
  • full()判断队列是否已满,若队列已满,put方法就会阻塞,等待有空位后put方法再往队列放数据。
  • put()将数据放入队列,默认是block阻塞状态。
  • get()从队列中取数据。
  • task_done()函数向任务已完成的队列发送一个信号,使join()函数能判断出队列还剩多少数据,是否清空了。
  • join()等待队列为空,再执行别的操作。
import time
from queue import Queue
import random
from threading import *


class Producer(Thread):
    def run(self):
        name = self.name
        nums = range(100)
        while True:
            if queue.full():
                print('队列已满:%s' % queue.qsize())
                break
            num = random.choice(nums)
            # 将数据放入队列,默认是block阻塞状态
            queue.put(num)
            print('生产者:%s生产了%s' % (name, num))
            t = random.randint(1, 2)
            # time.sleep(t)
            print('生产者:%s睡眠了%s秒' % (name, t))


class Consumer(Thread):
    def run(self):
        name = self.name
        while True:
            if queue.empty():
                print('队列为空:%s' % queue.qsize())
                break
            # 从队列中取数据
            num = queue.get()
            # queue.task_done()函数向任务已完成的队列发送一个信号,使queue.join()函数能判断出队列还剩多少数据,是否清空了
            queue.task_done()
            print('消费者:%s消费了%s' % (name, num))
            t = random.randint(1, 4)
            # time.sleep(t)
            print('消费者:%s睡眠了%s秒' % (name, t))


if __name__ == '__main__':
    # 设置队列最大值,过大对内存会有很大影响
    queue = Queue(maxsize=10)
    # 查看队列大小
    print(queue.qsize())
    # 判断队列是否为空,若队列为空,get方法就会阻塞,等待有数据后get方法再从队列中取数据
    print(queue.empty())
    # 判断队列是否已满,若队列已满,put方法就会阻塞,等待有空位后put方法再往队列放数据
    print(queue.full())
    # 开启10个“生产者”线程
    for i in range(10):
        producer = Producer(name='p%s' % i)
        producer.start()
    # 开启6个“消费者”线程
    for i in range(6):
        consumer = Consumer(name='c%s' % i)
        consumer.start()
    # 等待队列为空,再执行别的操作
    queue.join()
    print('end')

运行结果:

0
True
False
生产者:p0生产了45
生产者:p0睡眠了2秒
生产者:p0生产了91
生产者:p0睡眠了2秒
生产者:p0生产了22
生产者:p0睡眠了1秒
生产者:p0生产了74
生产者:p0睡眠了1秒
生产者:p0生产了3
生产者:p0睡眠了1秒
生产者:p0生产了67
生产者:p0睡眠了1秒
生产者:p0生产了47
生产者:p0睡眠了2秒
生产者:p0生产了39
生产者:p0睡眠了2秒
生产者:p0生产了75
生产者:p0睡眠了1秒
生产者:p0生产了28
生产者:p0睡眠了1秒
队列已满:10
队列已满:10
队列已满:10
队列已满:10
队列已满:10
队列已满:10
队列已满:10
队列已满:10
队列已满:10
队列已满:10
消费者:c0消费了45
消费者:c0睡眠了1秒
消费者:c0消费了91
消费者:c0睡眠了1秒
消费者:c0消费了22
消费者:c0睡眠了3秒
消费者:c0消费了74
消费者:c0睡眠了2秒
消费者:c0消费了3
消费者:c0睡眠了3秒
消费者:c0消费了67
消费者:c0睡眠了1秒
消费者:c0消费了47
消费者:c0睡眠了3秒
消费者:c0消费了39
消费者:c0睡眠了4秒
消费者:c0消费了75
消费者:c0睡眠了3秒
消费者:c1消费了28
消费者:c1睡眠了4秒
队列为空:0
队列为空:0
队列为空:0
队列为空:0
队列为空:0
队列为空:0
end



版权声明:本文为weixin_56175092原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。