Python Threading线程

  • Post author:
  • Post category:python



目录


线程对象 threading.Thread


定时器 threading.Timer


线程锁 threading.Lock


线程锁使用with


信号量 threading.BoundedSemaphore


线程池 concurrent.futures.ThreadPoolExecutor



python的多线程 对 IO密集型(文件处理,爬虫)代码还是友好的,对CPU密集型(数据处理,循环等)代码不太友好

由于Python 使用CPython 全局解释器锁 GIL 的缘故,Python的多线程相当于单核多线程。只能利用单核资源跑多个线程;要想利用多核资源可以使用进程来实现。

而Java的多线程能利用多核资源。

线程对象 threading.Thread

threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)


group

应该为

None

;为了日后扩展

ThreadGroup

类实现而保留。


target

是用于

run()

方法调用的可调用对象。默认是

None

,表示不需要调用任何方法。


name

是线程名称。默认情况下,由 “Thread-

N

” 格式构成一个唯一的名称,其中

N

是小的十进制数。


args

是用于调用目标函数的参数元组。默认是

()


kwargs

是用于调用目标函数的关键字参数字典。默认是

{}

如果你自己重写子类继承threading.Thead类的话,一定要确保在做任何事前,先发起调用基类构造器(

Thread.__init__()

)。


线程对象方法

import threading
from time import sleep


def demo_func_one():
    sleep(1)
    print("demo one")


def demo_func_two():
    for i in range(5):
        sleep(1)
        print("demo two")
        print("当前线程: ", threading.current_thread())  # <Thread(Thread-2, started 11952)>


def demo_func_three():
    sleep(1)
    print("demo three")
    print("当前活动线程", threading.active_count())  # 4, 三个子线程 一个主线程


func_list = [demo_func_one, demo_func_two, demo_func_three]
thread_list = []

print("主线程", threading.main_thread())  # <_MainThread(MainThread, started 3948)>

for func in func_list:
    t = threading.Thread(target=func)
    thread_list.append(t)
    t.setDaemon(True)  # 设置线程为守护线程. 主线程执行完毕,不管子线程是否执行结束,都结束程序
    t.start()  # 开始线程
    print("线程是否存活:", t.is_alive())  # True,线程是否存活

for t in thread_list:
    t.join()  # 等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 -

for t in thread_list:
    print(t.is_alive())  # False. 线程是否存活


# 主线程默认不是守护线程,主线程创建的子线程默认也不是守护线程

定时器 threading.Timer

import threading
import time


def demo_one():
    print("定时器 执行")


def demo_two():
    for i in range(6):
        print("非定时器")
        time.sleep(1)

t_list = []
f_list = [demo_one, demo_two]

# 创建定时器线程,再间隔时间 interval 之后执行
t = threading.Timer(interval=5, function=demo_one)
t.start()

# 创建非定时器线程
tt = threading.Thread(target=demo_two)
tt.start()

# 线程等待。 这里所有的线程join() 方法都要在所有线程都调用start()开始之后再调用,否则将根据调用join()的顺序执行
t.join()
tt.join()

"""
非定时器
非定时器
非定时器
非定时器
非定时器
定时器 执行
非定时器
"""

线程锁 threading.Lock

一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放;任何线程都可以释放它。

import threading
from threading import Lock

# 处理同一数据时,加线程锁,不会导致数据错乱

# 创建线程锁(互斥锁)对象
lock = Lock()


def demo_three():
    lock.acquire()  # 加锁,处理数据前
    print(lock.locked())  # 线程锁状态,此时为 True
    global num
    time.sleep(1)
    num -= 1
    time.sleep(1)
    print(num)
    lock.release()  # 解锁,处理完数据后
    print(lock.locked())  # 线程锁状态,此时为 False



t_list = []
for i in range(100):
    t = threading.Thread(target=demo_three)
    t.start()
    t_list.append(t)

for t in t_list:
    t.join()

"""
执行结果:
-1
-2
-3
...
"""


-----------------------------------------------------------------------
# 不加锁的情况,多线程处理同一数据会有问题


import threading
from threading import Lock



def demo_three():
    global num
    time.sleep(1)
    num -= 1
    time.sleep(1)
    print(num)



t_list = []
for i in range(100):
    t = threading.Thread(target=demo_three)
    t.start()
    t_list.append(t)

for t in t_list:
    t.join()

"""
执行结果:
-100
-100
-100
...
"""

线程锁使用with

import threading
from threading import Lock
import time

"""
使用以下加锁方式

with 锁对象:
    do something
    
和以下方式一样的效果

lock.acquire()
try:
    do something
finnaly:
    lock.release()
"""

lock = Lock()
num = 0


# 此方法加锁,正常执行
def demo_one():
    time.sleep(1)
    with lock:
        global num
        num -= 1
        print("locked" + str(num))
    time.sleep(1)

# 此方法不加锁,100个线程会全部输出 -100
# def demo_one():
#     global num
#     time.sleep(1)
#     num -= 1
#     time.sleep(1)
#     print("locked" + str(num))


for i in range(100):
    t = threading.Thread(target=demo_one)
    t.start()

信号量 threading.BoundedSemaphore

信号量 通常用于保护数量有限的资源.例如数据库服务器

在资源数量固定的任何情况下,都应该使用有界信号量。在生成任何工作线程前,应该在主线程中初始化信号量

互斥锁同时只允许一个线程操作数据,而 信号量 同时允许指定数量的线程操作数据

import threading
import time


max = 2
sem = threading.BoundedSemaphore(value=max)  # 设置最大信号量(最多可以同时处理数据的线程)

num = 0


def demo_semaphore(num, semaphore):
    semaphore.acquire()
    print("now the threading is {}, {}".format(num, str(time.time())))
    time.sleep(1)
    semaphore.release()


for i in range(100):
    t = threading.Thread(target=demo_semaphore, args=(i, sem))
    t.start()

线程池 concurrent.futures.ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor
from concurrent import futures
import time
import random


"""线程死锁 """
def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())


executor = ThreadPoolExecutor(max_workers=20)
executor.submit(wait_on_future)
"""线程池官方文档示例"""
from concurrent.futures import ThreadPoolExecutor
from concurrent import futures


# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()


# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

demo

from concurrent.futures import ThreadPoolExecutor
from concurrent import futures
import time
import random


def add(num, timeout=10):
    s = num * random.randint(1, 10)
    print("start %d %d" % (num ,s))
    time.sleep(s)
    print("end %d %d" % (num ,s))
    return num


with ThreadPoolExecutor(max_workers=10) as excutor:  # 创建线程池
    future_list = [excutor.submit(add, num, num) for num in range(5)]  # 提交任务开始执行任务
    # no = [print(fu) for fu in future_list]  # 查看任务状态
    for fu in futures.as_completed(future_list):  # 当future内的任务完成后,返回结果. 谁先执行完,先返回谁
        print(fu.result(), fu)  # 任务完成结果



版权声明:本文为weixin_44809381原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。