Python 爬虫性能提升相关知识

  • Post author:
  • Post category:python


一、多线程、多进程实现并发的基本应用

1.1、线程与进程的关系

线程是计算机的最小单元,线程存在于进程中;没开一个进程都会包含线程。I/O密集型的程序用多线程实现较好,计算密集型的程序用多进程实现比较好。在Python中进程会有一个GIL锁,同一时刻只能允许一个进程中的线程去到CPU,进行请求,而计算型的程序需要高速的请求CPU进行计算,开设多个进程对CPU进行请求完成高效率的计算;I/O程序就不用通过CPU,而是只要告知进来请求的线程渠道对应的I/O接口就可以,因此不用受到 Python GIL锁的限制。

# 创建线程池
# 线程运用 一
# 引入模块
from concurrent.futures import ThreadPoolExecutor
import requests

# 定义请求函数
def task(url):
    response = requests.get(url)
    print(url,response)

# 创建线程池
pool = ThreadPoolExecutor(5)

# 定义请求的URL列表
url_list = ['http://www.baidu.com','http://www.sogou.com','http://www.so.com']

for url in url_list:
    pool.submit(task,url)

pool.shutdown(wait=True)

# 线程运用 二

# 引入模块
from concurrent.futures import ThreadPoolExecutor
import requests

# 定义请求函数
def task(url):
    response = requests.get(url)
    print(url,response)
    return response # response会被传入pool.submit() 的回调函数中

# task() 运行完成后会运行回调函数 done()
def done(future,*args,**kwargs):
    print(future.result(),args,kwargs) # 第一个参数是得到requests请求的返回值 response 对象

# 创建线程池
pool = ThreadPoolExecutor(5)

# 定义请求的URL列表
url_list = ['http://www.baidu.com','http://www.sogou.com','http://www.so.com']

for url in url_list:
    response = pool.submit(task,url)
    response.add_done_callback(done) # 接收回调函数 可以多次调用

pool.shutdown(wait=True)



# 进程运用 一

from concurrent.futures import ProcessPoolExecutor
import requests

# 定义请求函数
def task(url):
    response = requests.get(url)
    print(url,response)

# 创建进程池
pool = ProcessPoolExecutor(5)

# 定义请求的URL列表
url_list = []

for url in url_list:
    pool.submit(task,url)

pool.shutdown(wait=True)

# 进程运用 二

# 引入模块
from concurrent.futures import ProcessPoolExecutor
import requests

# 定义请求函数
def task(url):
    response = requests.get(url)
    print(url,response)
    return response # response会被传入pool.submit() 的返回值回调方法中

# task() 运行完成后会运行回调函数 done()
def done(future,*args,**kwargs):
    print(future.result(),args,kwargs) # 第一个参数是得到requests请求的返回值

# 创建进程池
pool = ProcessPoolExecutor(5)

# 定义请求的URL列表
url_list = ['http://www.baidu.com','http://www.sogou.com','http://www.so.com']

for url in url_list:
    response = pool.submit(task,url)
    response.add_done_callback(done) # 接收回调函数 可以多次调用

pool.shutdown(wait=True)

二、异步IO

协程:让一个线程去做很多事情,单独的协程不能实现并发功能,只能实现让线程执行循序的切换,所以协程也称为微线程。与异步I/O联合使用才能完成并发功能。

# asyncio 实现协程与异步I/O请求 原理
# 导入asyncio
import asyncio

# 装饰器
@asyncio.coroutine
def func1():
    print('before....func1....')
    yield from asyncio.sleep(5) # 延迟 5 秒 不支持http请求,支持TCP请求
    print('end...func1...')

tasks = [func1(),func1()]
loop = asyncion.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

2.1、Http与TCP请求扩展

Http是基于TCP socket 请求开发,只是重新定义了自己的传输协议,可以通过TCP自行构建Http。

2.2、Python asyncio 和 aiohttp 完成并发功能

# 1、自行构建HTTP请求内容
import asyncio

@asyncio.coroutine
def task(host,url='/'):
    reader,writer = yield from asyncio.open_connection(host,80)
    
    # 构建符合HTTP请求的请求内容
    request_header_content = """ GET %s HTTP/1.0\r\nHost:%s\r\r\r\r"""%(url,host,)
    request_header_content = bytes(request_header_content,encoding='utf-8')
    
    writer.write(request_header_content)

    yield from writer.draln()

    text = yield from reader.read()
    print(host,url,text)
    writer.close()

tasks = [
    task('www.cnblogs.com','/wupeiqi/'),
    task('dig.chouti.com','/pic/show?nid=123456&id=456789')
]


# 将任务加入到线程中
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asybcio.gather(*tasks))
loop.close()

# 2、利用aiohttp模块
import asyncio
import aiohttp

@asyncio.coroutine
def task(url):
    print(url)
    response = yield from aiohttp.request('GET',url)
    print(url,response)
    response.close()

tasks = [
    task('www.cnblogs.com','/wupeiqi/'),
    task('dig.chouti.com','/pic/show?nid=123456&id=456789')
]


# 将任务加入到线程中
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asybcio.gather(*tasks))
loop.close()

2.2、基于 requests 模块实现并发

import asyncio
import requests

@asyncio.coroutine
def task(func,*args):
    loop = asyncio.get_event_loop()
    future = loop.run_in_executor(None,func,*args) # requests.get('https://www.baidu.com')
    response = yield from future
    print(response.url,response.content)

tasks = [
    task(requests.get,'https://www.baidu.com'),# requests.get或者request.post不执行
    task(requests.get,'https://www.so.com')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

2.3、gevent + requests 模块实现并发

from gevent import monkey  # 安装 先greenlet 后gevent 
import requests
# gevent 依赖于greenlet来处理协程的事务,与内部的异步I/O完成并发操作

monkey.patch_all() # 必须加,找到内部 socket 阻塞的请求,封装为符合gevent异步请求的socket


def task(nethod,url,req_kwargs):
    print (methodurl,req_kwags)
    response = requests.request(method=method,url=url,**req_kwargs)
    print(response.url,response.content)


gevent.joinall([
    gevent.spqwn(task,method='get',url='https://www.baidu.com',req_kwargs={}),
    gevent.spqwn(task,method='get',url='https://www.so.com',req_kwargs={}),
])

# 控制协程的数量
import gevent.pool import Pool
pool = Pool(2) # 最多只允许向远程发送2个请求 (协程池)
gevent.joinall([
    gevent.spqwn(task,method='get',url='https://www.baidu.com',req_kwargs={}),
    gevent.spqwn(task,method='get',url='https://www.so.com',req_kwargs={}),
])

2.4、grequests 模块 结合 gevent+requests两个模块

起因:要用http请求探测服务的有效性,多进程,多线程,感觉似乎没有必要,看看有没有协程的方案

1. 简单用法

grequests 利用 requests和gevent库,做了一个简单封装,使用起来非常方便

import grequests
import time
import requests
urls = [
    'https://docs.python.org/2.7/library/index.html',
    'https://docs.python.org/2.7/library/dl.html',
    'http://www.iciba.com/partial',
    'http://2489843.blog.51cto.com/2479843/1407808',
    'http://blog.csdn.net/woshiaotian/article/details/61027814',
    'https://docs.python.org/2.7/library/unix.html',
    'http://2489843.blog.51cto.com/2479843/1386820',
    'http://www.bazhuayu.com/tutorial/extract_loop_url.aspx?t=0',
]

def method1():
    t1 = time.time()
    for url in urls:
        res = requests.get(url)
        #print res.status_code

    t2 = time.time()
    print 'method1', t2 - t1

def method2():
    tasks = [grequests.get(u) for u in urls]
    t1 = time.time()
    res = grequests.map(tasks, size=3)
#    print res
    t2 = time.time()
    print 'method2', t2 - t1

def method3():
    tasks = [grequests.get(u) for u in urls]
    t1 = time.time()
    res = grequests.map(tasks, size=6)
#    print res
    t2 = time.time()

if __name__ == '__main__':
    method1()
    method2()
    method3()

运行结果如下:

method1 8.51106119156
method2 5.77834510803
method3 2.55373907089

可以看到使用协程以后,整个程序的完成时间有了大大缩短,并且每个协程的并发粒度也会影响整体时间

2. 重要参数

这里需要补充的是几个

grequests

def grequests.map(requests, stream=False, size=None, exception_handler=None, gtimeout=None)
参数 说明 备注
size 协程的并发度 当一个协程进行IO等待的时候,就会将CPU交给其他协程序,一般设置为50 ~ 100足矣
exception_handler 协程的并发度 捕获单个请求的异常
gtimeout 整体请求的超时设置

另外,由于grequests底层使用的是requests,因此它支持

GET,OPTIONS, HEAD, POST, PUT, DELETE 等各种http method

所以以下的任务请求都是支持的

grequests.post(url, json={"name":"zhangsan"}) 
grequests.delete(url) 

3. 事件钩子

grequests的底层库,是requests,因此它也支持事件钩子

def print_url(r, *args, **kwargs):
        print(r.url)

url = "http://www.baidu.com"
# 1.
res = requests.get(url, hooks={"response":print_url})

# 2.
tasks = []
req = grequests.get(url, callback=print_url)
tasks.append(req)
res = grequests.map(tasks)

2.5、Twisted 模块完成并发



Python Twisted 网络引擎模块

from twisted.internet import protocol, reactor, endpoints

class Echo(protocol.Protocol):
    def dataReceived(self, data):
        self.transport.write(data)

class EchoFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return Echo()

endpoints.serverFromString(reactor, "tcp:1234").listen(EchoFactory())
reactor.run()

3、自定义多并发

3.1、原理

利用socket与select模块完成自定义多并发。
1、socket客户端、服务端
    客户端在发请求进行socket请求时,连接是会被阻塞的;
    socket中 setblocking(False) 函数可以设置不阻塞,如果服务器端没有马上返回数据(连接无响应,数据未返回),就会报错,可以通过 try....except ERROR .... 语句来处理报错后继续执行其他的程序。

2、I/O多路复用 select模块
    客户端:
        try:
            socket对象r1.connet() # 会阻塞
            socket对象r2.connet()
        except EX:
            pass


    第一个列表:其中的对象有变化就会赋值给r;第二个列表:其中的对象连接成功后返回给w;第三个列表:返回的错误信息赋值给e,0.5表示超时时间。

    while True: # 监听socket是否发生了变化
        r,w,e = select.select([socket对象r1,socket对象r2],[socket对象w1,socket对象w2],[],0.5) # socket对象r1 与 socket对象w1是一模一样的对象
        r = [socket对象r1,] 表示socket对象r1有了变化
        r[0].recv() 服务端响应回来的数据被 r 接收 会阻塞
        w = [socket对象w1,] 表示socket对象w1与服务器连接成功,可以互相通讯了,w先获取到数据
        w[0].send('发送http的请求数据,headers等')

3、r,w,e = select.select([socket对象r1,socket对象r2],[socket对象w1,socket对象w2],[],0.5)中是否只能传入socket对象?可以传入其他的对象,但是对传入的对象有要求,传入的对象必须有fileno方法,并返回一个文件描述符。
    在socket外面套用一层其他对象的壳,并返回socket.fileno(),就可以将其传入select.select() 中了,select.select()不关心传入的对象是否为socket对象,但是我们无法自定义文件描述符,只能封装socket对象,从而获得文件描述符 .fileno()

    class Foo():
        def __init__(self):
            pass
        def fileno(self):
          obj = socket.socket()
          return obj.fileno()
    

3.2、利用 socket select 模块实现异步I/O

impoer socket
import select

sk = socket.socket()
sk.setblocking(False) # 设置为非阻塞,但是会出现异常
# 1、建立连接,处理费阻塞出现的异常
try:
    sk.connect(('www.baidu.com',80))
    print('连接成功...')
except BlockKingIOEroor as e:
    print(e)

data = 'GET / HTTP/1.0\r\nHost:baidu.com\r\n\r\n'
# data = 'POST / HTTP/1.0\r\nHost:baidu.com\r\n\r\nk1=v1&ke=v2' k1=v1&ke=v2 post请求体
# 2、发送内容
sk.send(data)
# 3、接收响应信息
res_data = sk.recv(1024) # IO阻塞
sk.close()


3.3、自定义异步IO(客户端简版)

''' 
    socket是IO多路复用的一个模块,可以同时监听多个socket.socket()对象;
    IO多路复用:利用 while True: 来监听多个socket对象;利用 r,w,e = select.select() 进行操作;
    异步IO:非阻塞的socket+IO多路复用
            - 非阻塞socket 通过设置setbocking(False)
            - select.select('可以封装为自定义对象,只要对象中有fileno方法,并返回socket文件描述符即可')
'''
import socket 
import select 


class HttpRequest:
    def __init__(self, sk, host, callback):
        self.socket = sk
        self.host = host
        self.callback = callback

    def fileno(self):
        return self.socket.fileno()


class HttpResponse:
    def __init__(self, recv_data):
        self.recv_data = recv_data
        self.headers = {}
        self.body = None
        self.initialize()
    def initialize(self):
        headers, body = self.recv_data.split(b'\r\n\r\n', 1)  # 分割响应头和响应体
        self.body = body
        # 处理响应头信息
        header_list = headers.split(b'\r\n')
        for h in header_list:
            h_str = str(h, encoding="utf-8")
            v = h_str.split(':', 1)
            if len(v) == 2:
                self.headers[v[0]] = v[1]


class AsyncRequest:
    def __init__(self):
        self.conn = []  # 用于获取数据检测
        self.connection = []  # 用于检测是否已经连接成功到服务器

    def add_request(self, host, callback):
        try:
            sk = socket.socket()
            sk.setblocking(0)  # 设置为非阻塞
            sk.connect((host, 80))  # 创建连接
        except BlockingIOError as e:
            print(e)
        request = HttpRequest(sk, host, callback)
        self.conn.append(request)
        self.connection.append(request)

    def run(self):
        while True:  # 事件循环,监听是否有变化
            rlist, wlist, elist = select.select(self.conn, self.connection, self.conn, 0.05)
            for w in wlist:
                # 只要能循环到,表示socket已经连接到服务器
                tpl = 'GET / HTTP/1.0\r\nhost:%s\r\n\r\n' % (w.host)
                w.socket.send(bytes(tpl, encoding="utf-8"))
                self.connection.remove(w)

            for r in rlist:
                # 获取到服务器返回的信息
                recv_data = bytes()
                while True:
                    try:
                        chunck = r.socket.recv(8096)
                        recv_data += chunck
                    except Exception as e:
                        print(e)
                        break
                print(r.host, recv_data)

                response = HttpResponse(recv_data)  # 处理请求的返回值
                r.callback(response)  # 通过回调函数处理获得的返回数据

                r.socket.close()  # 获取到数据关闭连接
                self.conn.remove(r)  # 获取数据完成删除相关的socket
            if len(self.conn) == 0:  # 接收完请求数据停止执行while循环
                break


# 回调函数
def f1(res):
    print('保存到文件', res.headers, res.body)


def f2(res):
    print('保存到数据库', res.headers, res.body)


url_list = [
    {'host': 'www.baidu.com', 'callback': f1},
    {'host': 'www.qq.com', 'callback': f2},
]

req = AsyncRequest()

for item in url_list:
    req.add_request(item['host'], item['callback'])

req.run()



版权声明:本文为u011146423原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。