一、多线程、多进程实现并发的基本应用
1.1、线程与进程的关系
线程是计算机的最小单元,线程存在于进程中;没开一个进程都会包含线程。I/O密集型的程序用多线程实现较好,计算密集型的程序用多进程实现比较好。在Python中进程会有一个GIL锁,同一时刻只能允许一个进程中的线程去到CPU,进行请求,而计算型的程序需要高速的请求CPU进行计算,开设多个进程对CPU进行请求完成高效率的计算;I/O程序就不用通过CPU,而是只要告知进来请求的线程渠道对应的I/O接口就可以,因此不用受到 Python GIL锁的限制。
# 创建线程池
# 线程运用 一
# 引入模块
from concurrent.futures import ThreadPoolExecutor
import requests
# 定义请求函数
def task(url):
response = requests.get(url)
print(url,response)
# 创建线程池
pool = ThreadPoolExecutor(5)
# 定义请求的URL列表
url_list = ['http://www.baidu.com','http://www.sogou.com','http://www.so.com']
for url in url_list:
pool.submit(task,url)
pool.shutdown(wait=True)
# 线程运用 二
# 引入模块
from concurrent.futures import ThreadPoolExecutor
import requests
# 定义请求函数
def task(url):
response = requests.get(url)
print(url,response)
return response # response会被传入pool.submit() 的回调函数中
# task() 运行完成后会运行回调函数 done()
def done(future,*args,**kwargs):
print(future.result(),args,kwargs) # 第一个参数是得到requests请求的返回值 response 对象
# 创建线程池
pool = ThreadPoolExecutor(5)
# 定义请求的URL列表
url_list = ['http://www.baidu.com','http://www.sogou.com','http://www.so.com']
for url in url_list:
response = pool.submit(task,url)
response.add_done_callback(done) # 接收回调函数 可以多次调用
pool.shutdown(wait=True)
# 进程运用 一
from concurrent.futures import ProcessPoolExecutor
import requests
# 定义请求函数
def task(url):
response = requests.get(url)
print(url,response)
# 创建进程池
pool = ProcessPoolExecutor(5)
# 定义请求的URL列表
url_list = []
for url in url_list:
pool.submit(task,url)
pool.shutdown(wait=True)
# 进程运用 二
# 引入模块
from concurrent.futures import ProcessPoolExecutor
import requests
# 定义请求函数
def task(url):
response = requests.get(url)
print(url,response)
return response # response会被传入pool.submit() 的返回值回调方法中
# task() 运行完成后会运行回调函数 done()
def done(future,*args,**kwargs):
print(future.result(),args,kwargs) # 第一个参数是得到requests请求的返回值
# 创建进程池
pool = ProcessPoolExecutor(5)
# 定义请求的URL列表
url_list = ['http://www.baidu.com','http://www.sogou.com','http://www.so.com']
for url in url_list:
response = pool.submit(task,url)
response.add_done_callback(done) # 接收回调函数 可以多次调用
pool.shutdown(wait=True)
二、异步IO
协程:让一个线程去做很多事情,单独的协程不能实现并发功能,只能实现让线程执行循序的切换,所以协程也称为微线程。与异步I/O联合使用才能完成并发功能。
# asyncio 实现协程与异步I/O请求 原理
# 导入asyncio
import asyncio
# 装饰器
@asyncio.coroutine
def func1():
print('before....func1....')
yield from asyncio.sleep(5) # 延迟 5 秒 不支持http请求,支持TCP请求
print('end...func1...')
tasks = [func1(),func1()]
loop = asyncion.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
2.1、Http与TCP请求扩展
Http是基于TCP socket 请求开发,只是重新定义了自己的传输协议,可以通过TCP自行构建Http。
2.2、Python asyncio 和 aiohttp 完成并发功能
# 1、自行构建HTTP请求内容
import asyncio
@asyncio.coroutine
def task(host,url='/'):
reader,writer = yield from asyncio.open_connection(host,80)
# 构建符合HTTP请求的请求内容
request_header_content = """ GET %s HTTP/1.0\r\nHost:%s\r\r\r\r"""%(url,host,)
request_header_content = bytes(request_header_content,encoding='utf-8')
writer.write(request_header_content)
yield from writer.draln()
text = yield from reader.read()
print(host,url,text)
writer.close()
tasks = [
task('www.cnblogs.com','/wupeiqi/'),
task('dig.chouti.com','/pic/show?nid=123456&id=456789')
]
# 将任务加入到线程中
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asybcio.gather(*tasks))
loop.close()
# 2、利用aiohttp模块
import asyncio
import aiohttp
@asyncio.coroutine
def task(url):
print(url)
response = yield from aiohttp.request('GET',url)
print(url,response)
response.close()
tasks = [
task('www.cnblogs.com','/wupeiqi/'),
task('dig.chouti.com','/pic/show?nid=123456&id=456789')
]
# 将任务加入到线程中
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asybcio.gather(*tasks))
loop.close()
2.2、基于 requests 模块实现并发
import asyncio
import requests
@asyncio.coroutine
def task(func,*args):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None,func,*args) # requests.get('https://www.baidu.com')
response = yield from future
print(response.url,response.content)
tasks = [
task(requests.get,'https://www.baidu.com'),# requests.get或者request.post不执行
task(requests.get,'https://www.so.com')
]
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
2.3、gevent + requests 模块实现并发
from gevent import monkey # 安装 先greenlet 后gevent
import requests
# gevent 依赖于greenlet来处理协程的事务,与内部的异步I/O完成并发操作
monkey.patch_all() # 必须加,找到内部 socket 阻塞的请求,封装为符合gevent异步请求的socket
def task(nethod,url,req_kwargs):
print (methodurl,req_kwags)
response = requests.request(method=method,url=url,**req_kwargs)
print(response.url,response.content)
gevent.joinall([
gevent.spqwn(task,method='get',url='https://www.baidu.com',req_kwargs={}),
gevent.spqwn(task,method='get',url='https://www.so.com',req_kwargs={}),
])
# 控制协程的数量
import gevent.pool import Pool
pool = Pool(2) # 最多只允许向远程发送2个请求 (协程池)
gevent.joinall([
gevent.spqwn(task,method='get',url='https://www.baidu.com',req_kwargs={}),
gevent.spqwn(task,method='get',url='https://www.so.com',req_kwargs={}),
])
2.4、grequests 模块 结合 gevent+requests两个模块
起因:要用http请求探测服务的有效性,多进程,多线程,感觉似乎没有必要,看看有没有协程的方案
1. 简单用法
grequests 利用 requests和gevent库,做了一个简单封装,使用起来非常方便
import grequests
import time
import requests
urls = [
'https://docs.python.org/2.7/library/index.html',
'https://docs.python.org/2.7/library/dl.html',
'http://www.iciba.com/partial',
'http://2489843.blog.51cto.com/2479843/1407808',
'http://blog.csdn.net/woshiaotian/article/details/61027814',
'https://docs.python.org/2.7/library/unix.html',
'http://2489843.blog.51cto.com/2479843/1386820',
'http://www.bazhuayu.com/tutorial/extract_loop_url.aspx?t=0',
]
def method1():
t1 = time.time()
for url in urls:
res = requests.get(url)
#print res.status_code
t2 = time.time()
print 'method1', t2 - t1
def method2():
tasks = [grequests.get(u) for u in urls]
t1 = time.time()
res = grequests.map(tasks, size=3)
# print res
t2 = time.time()
print 'method2', t2 - t1
def method3():
tasks = [grequests.get(u) for u in urls]
t1 = time.time()
res = grequests.map(tasks, size=6)
# print res
t2 = time.time()
if __name__ == '__main__':
method1()
method2()
method3()
运行结果如下:
method1 8.51106119156
method2 5.77834510803
method3 2.55373907089
可以看到使用协程以后,整个程序的完成时间有了大大缩短,并且每个协程的并发粒度也会影响整体时间
2. 重要参数
这里需要补充的是几个
grequests
def grequests.map(requests, stream=False, size=None, exception_handler=None, gtimeout=None)
参数 | 说明 | 备注 |
---|---|---|
size | 协程的并发度 | 当一个协程进行IO等待的时候,就会将CPU交给其他协程序,一般设置为50 ~ 100足矣 |
exception_handler | 协程的并发度 | 捕获单个请求的异常 |
gtimeout | 整体请求的超时设置 |
另外,由于grequests底层使用的是requests,因此它支持
GET,OPTIONS, HEAD, POST, PUT, DELETE 等各种http method
所以以下的任务请求都是支持的
grequests.post(url, json={"name":"zhangsan"})
grequests.delete(url)
3. 事件钩子
grequests的底层库,是requests,因此它也支持事件钩子
def print_url(r, *args, **kwargs):
print(r.url)
url = "http://www.baidu.com"
# 1.
res = requests.get(url, hooks={"response":print_url})
# 2.
tasks = []
req = grequests.get(url, callback=print_url)
tasks.append(req)
res = grequests.map(tasks)
2.5、Twisted 模块完成并发
from twisted.internet import protocol, reactor, endpoints
class Echo(protocol.Protocol):
def dataReceived(self, data):
self.transport.write(data)
class EchoFactory(protocol.Factory):
def buildProtocol(self, addr):
return Echo()
endpoints.serverFromString(reactor, "tcp:1234").listen(EchoFactory())
reactor.run()
3、自定义多并发
3.1、原理
利用socket与select模块完成自定义多并发。
1、socket客户端、服务端
客户端在发请求进行socket请求时,连接是会被阻塞的;
socket中 setblocking(False) 函数可以设置不阻塞,如果服务器端没有马上返回数据(连接无响应,数据未返回),就会报错,可以通过 try....except ERROR .... 语句来处理报错后继续执行其他的程序。
2、I/O多路复用 select模块
客户端:
try:
socket对象r1.connet() # 会阻塞
socket对象r2.connet()
except EX:
pass
第一个列表:其中的对象有变化就会赋值给r;第二个列表:其中的对象连接成功后返回给w;第三个列表:返回的错误信息赋值给e,0.5表示超时时间。
while True: # 监听socket是否发生了变化
r,w,e = select.select([socket对象r1,socket对象r2],[socket对象w1,socket对象w2],[],0.5) # socket对象r1 与 socket对象w1是一模一样的对象
r = [socket对象r1,] 表示socket对象r1有了变化
r[0].recv() 服务端响应回来的数据被 r 接收 会阻塞
w = [socket对象w1,] 表示socket对象w1与服务器连接成功,可以互相通讯了,w先获取到数据
w[0].send('发送http的请求数据,headers等')
3、r,w,e = select.select([socket对象r1,socket对象r2],[socket对象w1,socket对象w2],[],0.5)中是否只能传入socket对象?可以传入其他的对象,但是对传入的对象有要求,传入的对象必须有fileno方法,并返回一个文件描述符。
在socket外面套用一层其他对象的壳,并返回socket.fileno(),就可以将其传入select.select() 中了,select.select()不关心传入的对象是否为socket对象,但是我们无法自定义文件描述符,只能封装socket对象,从而获得文件描述符 .fileno()
class Foo():
def __init__(self):
pass
def fileno(self):
obj = socket.socket()
return obj.fileno()
3.2、利用 socket select 模块实现异步I/O
impoer socket
import select
sk = socket.socket()
sk.setblocking(False) # 设置为非阻塞,但是会出现异常
# 1、建立连接,处理费阻塞出现的异常
try:
sk.connect(('www.baidu.com',80))
print('连接成功...')
except BlockKingIOEroor as e:
print(e)
data = 'GET / HTTP/1.0\r\nHost:baidu.com\r\n\r\n'
# data = 'POST / HTTP/1.0\r\nHost:baidu.com\r\n\r\nk1=v1&ke=v2' k1=v1&ke=v2 post请求体
# 2、发送内容
sk.send(data)
# 3、接收响应信息
res_data = sk.recv(1024) # IO阻塞
sk.close()
3.3、自定义异步IO(客户端简版)
'''
socket是IO多路复用的一个模块,可以同时监听多个socket.socket()对象;
IO多路复用:利用 while True: 来监听多个socket对象;利用 r,w,e = select.select() 进行操作;
异步IO:非阻塞的socket+IO多路复用
- 非阻塞socket 通过设置setbocking(False)
- select.select('可以封装为自定义对象,只要对象中有fileno方法,并返回socket文件描述符即可')
'''
import socket
import select
class HttpRequest:
def __init__(self, sk, host, callback):
self.socket = sk
self.host = host
self.callback = callback
def fileno(self):
return self.socket.fileno()
class HttpResponse:
def __init__(self, recv_data):
self.recv_data = recv_data
self.headers = {}
self.body = None
self.initialize()
def initialize(self):
headers, body = self.recv_data.split(b'\r\n\r\n', 1) # 分割响应头和响应体
self.body = body
# 处理响应头信息
header_list = headers.split(b'\r\n')
for h in header_list:
h_str = str(h, encoding="utf-8")
v = h_str.split(':', 1)
if len(v) == 2:
self.headers[v[0]] = v[1]
class AsyncRequest:
def __init__(self):
self.conn = [] # 用于获取数据检测
self.connection = [] # 用于检测是否已经连接成功到服务器
def add_request(self, host, callback):
try:
sk = socket.socket()
sk.setblocking(0) # 设置为非阻塞
sk.connect((host, 80)) # 创建连接
except BlockingIOError as e:
print(e)
request = HttpRequest(sk, host, callback)
self.conn.append(request)
self.connection.append(request)
def run(self):
while True: # 事件循环,监听是否有变化
rlist, wlist, elist = select.select(self.conn, self.connection, self.conn, 0.05)
for w in wlist:
# 只要能循环到,表示socket已经连接到服务器
tpl = 'GET / HTTP/1.0\r\nhost:%s\r\n\r\n' % (w.host)
w.socket.send(bytes(tpl, encoding="utf-8"))
self.connection.remove(w)
for r in rlist:
# 获取到服务器返回的信息
recv_data = bytes()
while True:
try:
chunck = r.socket.recv(8096)
recv_data += chunck
except Exception as e:
print(e)
break
print(r.host, recv_data)
response = HttpResponse(recv_data) # 处理请求的返回值
r.callback(response) # 通过回调函数处理获得的返回数据
r.socket.close() # 获取到数据关闭连接
self.conn.remove(r) # 获取数据完成删除相关的socket
if len(self.conn) == 0: # 接收完请求数据停止执行while循环
break
# 回调函数
def f1(res):
print('保存到文件', res.headers, res.body)
def f2(res):
print('保存到数据库', res.headers, res.body)
url_list = [
{'host': 'www.baidu.com', 'callback': f1},
{'host': 'www.qq.com', 'callback': f2},
]
req = AsyncRequest()
for item in url_list:
req.add_request(item['host'], item['callback'])
req.run()