python爬虫-异步爬虫

  • Post author:
  • Post category:python

注:本文章为学习过程中对知识点的记录,供自己复习使用,也给大家做个参考,如有错误,麻烦指出,大家共同探讨,互相进步。
借鉴出处:
该文章的路线和主要内容:崔庆才(第2版)python3网络爬虫开发实战

前言:爬虫属于IO密集型任务,例如使用request库来爬取某个站点,当发出一个请求后,程序必须等待网站返回响应,才能接着运行,而在等待响应的过程中,整个爬虫程序是一直在等待的,实际上没有做任何事情。

1、协程的基本原理

1.1 基础知识

  • 阻塞
    阻塞状态指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续干别的事情,则称该程序在操作上是阻塞的。
  • 非阻塞
    程序在等待某操作的过程中,自身不被阻塞,可以继续干别的事情,则称该程序在该操作上是非阻塞的。
    非阻塞并不是在任何程序级别、任何情况下都存在的。仅当程序封装的级别可以囊括独立的子程序单元时,程序才可能存在非阻塞状态。
    非阻塞因阻塞的存在而存在,正因为阻塞导致程序运行的耗时增加与效率低下,我们才要把它变成非阻塞的。
  • 同步
    不同程序单元为了共同完成某个任务,在执行过程中需要靠某种通信方式保持协调一致,此时这些程序单元是同步执行的。
    例如在购物系统中更新商品库存时,需要用“行锁”作为通信信号,强制让不同的更新请求排队并按顺序执行,这里的更新库存操作就是同步的。
  • 异步
    为了完成某个任务,有时不同程序单元之间无须通信协调也能完成任务,此时不相关的程序单元之间可以是异步的。
  • 多进程
    多进程就是利用CPU多核的优势,在同一时间并执行多个任务,可以大大提高执行效率。
  • 协程
    协程(coroutine),又称作微线程、纤程,是一种运行在用户态的轻量级线程。
    协程拥有自己的寄存器上下文和栈。协程在调度切换时,将寄存器上下文和栈保存到其他地方,等切换回来的时候,再恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用时的状态。
    协程本质上是个单进程,相对于多进程,它没有线程上下文切换的开销,没有原子操作锁定及同步的开销,编程模型也非常简单。

1.2 协程的用法

Python中使用协程最常用的库为asyncio。

  • event_loop:事件循环,相当于一个无限虚幻,我们可以把一些函数注册到这个事情循环上,当满足发生条件时,就调用对应的处理方法。
  • coroutine:中文翻译叫协程,在Python中常指代协程对象类型,我们可以将协程对象注册到时间循环中,它会被事件训话调用。我们可以使用async关键字来定义一个方法,这个方法在调用时不会立即被执行,而是会返回一个协程对象。
  • task:任务,这是对协程对昂的进一步封装,包含协程对象的各个状态。
  • future:代表将来执行或者没有执行的任务的结果,实际上和task没有本质区别。

1.2.1 定义协程

例子1:了解协程对象、事件循环
输入:

import asyncio
# async定义的方法会变成一个无法直接执行的协程对象,必须将此对象注册到事件循环中才可以执行。
async def execute(x):
    print('Number:', x)
coroutine = execute(1)
# 此时直接调用async定义的方法,返回的只是一个协程对象
print('Coroutine:', coroutine)
print('after execute')
# 使用get_event_loop()方法创建一个事件循环loop,并调用loop对象的run_until_complete方法将协程对象注册到了事件循环中,才会触发定义的方法。
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('after loop')

输出:

Coroutine: <coroutine object execute at 0x000001E452DDAB40>
after execute
Number: 1
after loop

例子2:了解task任务

import asyncio
# async定义的方法会变成一个无法直接执行的协程对象,必须将此对象注册到事件循环中才可以执行。
async def execute(x):
    print('Number:', x)
    return x
coroutine = execute(1)
# 此时直接调用async定义的方法,返回的只是一个协程对象
print('Coroutine:', coroutine)
print('after execute')
# 使用get_event_loop()方法创建一个事件循环loop
loop = asyncio.get_event_loop()
# 将协程对象转化为task任务,此时的任务还是pending状态
task = loop.create_task(coroutine)
print('Task:', task)
# 将task任务注册到事件循环中,然后task状态变为了finished,result=1是execute()执行的结果
loop.run_until_complete(task)
print('Task:', task)
print('after loop')

输出:

Coroutine: <coroutine object execute at 0x000002776288AB40>
after execute
Task: <Task pending name='Task-1' coro=<execute() running at D:\Project\scrape\urllib\haha.py:3>>
Number: 1
Task: <Task finished name='Task-1' coro=<execute() done, defined at D:\Project\scrape\urllib\haha.py:3> result=1>
after loop

创建任务的另一种方式:

task = ascynic.ensure_future(coroutine)

1.2.2 绑定回调

绑定回调的作用就是当协程对象执行完毕之后,就去执行声明的回调函数。
输入:

import asyncio
import requests
# async定义的方法会变成一个无法直接执行的协程对象,必须将此对象注册到事件循环中才可以执行。
async def request():
    url = 'http://www.baidu.com'
    status = requests.get(url)
    return status
# 定义的回调函数
def callback(task):
    print('Status:', task.result())

coroutine = request()
task = asyncio.ensure_future(coroutine)
# 通过add_done_callback()函数实现协程对象执行完毕后再去执行声明的callback方法的关联。
task.add_done_callback(callback)
print('Task:', task)
# 使用get_event_loop()方法创建一个事件循环loop
loop = asyncio.get_event_loop()
# 将task任务注册到事件循环中
loop.run_until_complete(task)
print('Task:', task)

输出:

Task: <Task pending name='Task-1' coro=<request() running at D:\Project\scrape\urllib\haha.py:4> cb=[callback() at D:\Project\scrape\urllib\haha.py:9]>
Status: <Response [200]>
Task: <Task finished name='Task-1' coro=<request() done, defined at D:\Project\scrape\urllib\haha.py:4> result=<Response [200]>>

分析:实际上,即使不适用回调方法,在task运行完毕之后,也可以直接调用result方法获取结果。

1.2.2 多任务协程

执行多次请求,可以定义一个task列表,然后使用asyncio包中的wait方法执行。
输入:

import asyncio
import requests
# async定义的方法会变成一个无法直接执行的协程对象,必须将此对象注册到事件循环中才可以执行。
async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Task1:', tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
    print('Task Result:', task.result())

输出:

Task1: [<Task pending name='Task-1' coro=<request() running at D:\Project\scrape\urllib\haha.py:4>>, <Task pending name='Task-2' coro=<request() running at D:\Project\scrape\urllib\haha.py:4>>, <Task pending name='Task-3' coro=<requ
est() running at D:\Project\scrape\urllib\haha.py:4>>, <Task pending name='Task-4' coro=<request() running at D:\Project\scrape\urllib\haha.py:4>>, <Task pending name='Task-5' coro=<request() running at D:\Project\scrape\urllib\haha
.py:4>>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>

分析:
loop函数不要放在定义函数内,否则会报错。

await后面的对象必须是如下格式之一:
1、一个原生协程对象;
2、一个由types.coroutine修饰的生成器,这个生成器可以返回协程对象;
3、由一个包含__await__方法的对象返回的一个迭代器。

上面声明的方式比较复杂,aiohttp是一个支持异步请求的库,它和asyncio配合使用,可以非常方便地实现异步请求操作。

2、aiohttp的使用

安装pip install aiohttp

2.1 以一个例子开始aiohttp

输入:

import asyncio
import aiohttp
import time

start = time.time()

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    await response.text()
    await session.close()
    return response

async def request(id):
    url = f'https://www.httpbin.org/deplay/{id}'
    print('Waiting for::::::::',url)
    # 执行get方法时,会被挂起,但get方法内第一步时非阻塞的,挂起后会被立马唤醒
    response =  await get(url)
    print('Get response from:::::::', url, 'response::::::::', response)

tasks = [asyncio.ensure_future(request(i)) for i in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('耗时:::', end-start)

输入:

Waiting for:::::::: https://www.httpbin.org/deplay/0
Waiting for:::::::: https://www.httpbin.org/deplay/1
Waiting for:::::::: https://www.httpbin.org/deplay/2
Waiting for:::::::: https://www.httpbin.org/deplay/3
Waiting for:::::::: https://www.httpbin.org/deplay/4
Get response from::::::: https://www.httpbin.org/deplay/1 response:::::::: <ClientResponse(https://www.httpbin.org/deplay/1) [404 NOT FOUND]>
<CIMultiDictProxy('Date': 'Tue, 01 Nov 2022 15:04:54 GMT', 'Content-Type': 'text/html', 'Content-Length': '233', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Cred
entials': 'true')>

Get response from::::::: https://www.httpbin.org/deplay/3 response:::::::: <ClientResponse(https://www.httpbin.org/deplay/3) [404 NOT FOUND]>
<CIMultiDictProxy('Date': 'Tue, 01 Nov 2022 15:04:54 GMT', 'Content-Type': 'text/html', 'Content-Length': '233', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Cred
entials': 'true')>

Get response from::::::: https://www.httpbin.org/deplay/0 response:::::::: <ClientResponse(https://www.httpbin.org/deplay/0) [404 NOT FOUND]>
<CIMultiDictProxy('Date': 'Tue, 01 Nov 2022 15:04:54 GMT', 'Content-Type': 'text/html', 'Content-Length': '233', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Cred
entials': 'true')>

Get response from::::::: https://www.httpbin.org/deplay/2 response:::::::: <ClientResponse(https://www.httpbin.org/deplay/2) [404 NOT FOUND]>
<CIMultiDictProxy('Date': 'Tue, 01 Nov 2022 15:04:54 GMT', 'Content-Type': 'text/html', 'Content-Length': '233', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Cred
entials': 'true')>

Get response from::::::: https://www.httpbin.org/deplay/4 response:::::::: <ClientResponse(https://www.httpbin.org/deplay/4) [404 NOT FOUND]>
<CIMultiDictProxy('Date': 'Tue, 01 Nov 2022 15:04:54 GMT', 'Content-Type': 'text/html', 'Content-Length': '233', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Cred
entials': 'true')>

耗时::: 1.092451810836792

分析:时间循环会运行第一个task,执行第一个get方法时,会被挂起,但get方法第一步是创建了ClientSession对象,是非阻塞的,挂起后会被立马唤醒。接着执行await session.get是会被挂起等待,此期间事件循环会寻找当前未被挂起的协程继续进行。都被挂起后,请求还没有响应,就继续等待直到获取到结果。

2.2 基本介绍

  asynic模块内部实现了对TCP、UDP、SSL协议的异步操作,但是对于HTTP请求来说,就需要用aiohttp实现了。
  aiphttp是一个基于asynico的异步HTTP网络模块,它既提供了服务端,有提供了客户端。其中,我们用服务器可以搭建一个支持异步处理的处理器,这个服务器就是用来处理请求并返回响应的,类似于Django、Flask等一些Web服务器。而客户端可以用来发送请求,类似于使用requests发起一个HTTP请求然后获得响应,但requests发起的是通的网络请求,aiohttp则是异步的
输入:
aiohttp客户端例子

import asyncio
import aiohttp

async def fetch(session,url):
    async with session.get(url) as response:
        return await response.text(), response.status

async def main():
    async with aiohttp.ClientSession() as session:
        html,status =  await fetch(session, 'http://www.baidu.com')
        print(f'html:\n{html}','\n',f'status:{status}')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

html:
............
status: 200

分析:

  aiohttp实现的异步爬取,与之前的定义有明显的区别,主要包括以下几点:

  • aiohttp是对http请求进行异步爬取的库,实现异步爬取,需要启动协程,而协程则需要借助asynico里面的事件循环才能执行。
  • 每个异步方法的前面都要统一+async来修饰。(async定义的方法会变成一个无法直接执行的协程对象,必须将此对象注册到事件循环中才可以执行)
  • with as前面加上async代表声明一个支持异步的上下文管理器。(with as能够自动分配和释放资源)
  • 对于一些返回协程对象的操作,官方API文档里,response.text返回的是client对象,所以要在前面加上await;response.status返回的是数值,因此前面不要加await。参考官方文档https://docs.aiohttp.org/en/stable/client_reference.html
  • 最后定义的协程对象要调用事件循环。

注:3.7版本以后可以直接用asynico.run(main())代替上方显示声明事件循环,run方法内部会自动启动一个事件循环。

2.3 URL参数设置

# 借助params参数传参
params = {'name':'Jack', 'age':24}
aiohttp.ClientSession().get(url, params=params)

#aiohttp支持其他请求类型,如POST、PUT、DELETE等,这些和requests的使用方法类似
 #- 对于POST表单提交,其对应的请求头中的Content-Type为application/x-www-form-urlencoded
.post(url, data=data)
 #- 对应POST JSON数据提交,其对应的请求头中的Content-Type为application/json
 .post(url, json=data)
.put(url, data=data)
.delete(url)
.head(url)
.options(url)
.patch(url, data=data)

响应,与requests响应基本一致。需要加await的要查看响应类型是否是协程对象(如async修饰的方法),具体查看apihttp的API官方文档。

设置超时

# 设置超时1s
timeout = apihttp.ClientTimout(total=1)
aiohttp.ClientTimeout(timeout=timeout)
# 如果超时,则会抛出TImeoutError异常,其类型为asynico.TimeoutError。

ClientTimeout对象还有其他connect、socket_connect等参数,详细API可以参考官方文档:https://docs.aiohttp.org/en/stable/client_quickstart.html#timeouts

并发限制
  由于aiohttp可以支持非常高的并发量,百万量都是能做到的,所以部分网站如果响应不过来,有瞬间将目标网站爬挂掉的危,这是就要限制爬取的并发量。
  一般情况下,借助asynico的Semaphore来控制并发量

import asyncio
import aiohttp

CONCURRENCY = 5
URL = 'https://www.baidu.com'
# 创建信号量对象,用来控制并发量大小
semaphore = asyncio.Semaphore(CONCURRENCY)
session = None

async def scrape_api():
    # 把semaphore直接放置在了对应的爬取方法里,使用async with语句将semaphore作为上下文对象即可。
    async with semaphore:
        print('正在爬取',URL)
        async with session.get(URL) as reponse:
            await asyncio.sleep(1)
            return await reponse.text()

async def main():
    global session
    session = aiohttp.ClientSession()
    scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)]
    await asyncio.gather(*scrape_index_tasks)

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

分析:
asyncio.await && asyncio.gather
相同:从功能上看,asyncio.wait 和 asyncio.gather 实现的效果是相同的,都是把所有 Task 任务结果收集起来。
不同:asyncio.wait 会返回两个值:done 和 pending,done 为已完成的协程 Task,pending 为超时未完成的协程 Task,需通过 future.result 调用 Task 的 result;而asyncio.gather 返回的是所有已完成 Task 的 result,不需要再进行调用或其他操作,就可以得到全部结果。

3、aiohttp异步爬取实战

爬取目标
1、爬取地址:https://spa5.scrape.center/
2、使用aiohttp爬取全站的图书数据;
3、将数据存储到数据库或独立文件中;

import asyncio
import aiohttp
import logging
import json
from motor.motor_asyncio import AsyncIOMotorClient
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s:%(message)s')

BASE_URL = 'https://spa5.scrape.center/api/book/?limit=18&offset={offset}'
DETAIL_URL = 'https://spa5.scrape.center/api/book/{book_id}/'

PAGE_SIZE = 18
PAGE_NUMBER = 10
COUCURRENCY = 20

session = None

semaphore = asyncio.Semaphore(COUCURRENCY)

# 连接MongoDB
MONGO_CONNECTION_STRING = 'mongodb://localhost:27017'
MONGO_DB_NAME = 'books'
MONGO_CONNECTION_NAME = 'books'
client = AsyncIOMotorClient(MONGO_CONNECTION_STRING)
db = client[MONGO_DB_NAME]
collection = db[MONGO_CONNECTION_NAME]
# 数据存储
async def save_data(data):
    logging.info('保存数据 %s', data)
    if data:
        return await collection.update_one({
            'id': data.get('id')
        },{
            '$set': data
        }, upsert=True)

# 定义爬取url返回json数据
async def scrape_api(url):
    async with semaphore:
        try:
            logging.info("爬取路径:%s", url)
            async with session.get(url) as response:
                return await response.json()
        except aiohttp.ClientError:
            logging.error("爬取%s出现了错误", url, exc_info=True)

# 爬取列表页
async def scrape_base(page):
    url = BASE_URL.format(offset = PAGE_SIZE * (page-1))
    return await scrape_api(url)

# 爬取详情页
async def scrape_detail(id):
    url = DETAIL_URL.format(book_id = id)
    data =  await scrape_api(url)
    await save_data(data)

async def main():
    global session
    session = aiohttp.ClientSession()
    scrape_base_tasks = [asyncio.ensure_future(scrape_base(page)) for page in range(1,PAGE_NUMBER+1)]
    results = await asyncio.gather(*scrape_base_tasks)
    logging.info('results结果为 \n %s', json.dumps(results, ensure_ascii=False, indent=2))
    # 提取所有book_id
    ids = []
    for result in results:
        if not result:
            continue
        for item in result.get('results'):
            ids.append(item.get('id'))
    scrape_detail_tasks = [asyncio.ensure_future(scrape_detail(id)) for id in ids]
    await asyncio.wait(scrape_detail_tasks)
    await session.close()

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

版权声明:本文为dayexiaofan原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。