实现一个免费的IP代理池

  • Post author:
  • Post category:其他


我将代理池程序设计为4个模块:

存储模块



获取模块



检查模块



接口模块

。模块之间关系如下:


存储模块

:我使用的是redis数据库存储,使用的是redis中的集合,集合内元素无序并且无重复。该存储模块的主要功能就是存入可用代理,删除不可用代理,随机提取可用代理,获取所有可用代理,获取当前可用代理数量。


获取模块

:该模块主要功能是定时对网上的免费代理网站进行爬取。


检查模块

:检查模块主要有两个功能:第一个是对获取模块爬取的代理进行检查,检查通过则允许调用存储模块存入数据库,如果检查不通过,这筛出掉该条代理。第二个是定时对数据库中的代理进行复查,如果通过,则保留,如果没有通过则删除该代理。


接口模块

:建立Flask程序,对外提供接口,可以通过该接口随机获取可用代理,可用代理总数。

项目结构如下:

依次是:检查模块,配置文件,获取模块,执行文件,接口模块,存储模块。


存储模块代码

import redis

from config import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY


class RedisClient(object):
    def __init__(self):
        """初始化,建立redis连接"""
        self.db = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, decode_responses=True)

    def add_one(self, proxy):
        """添加代理"""
        if not self.db.sismember(REDIS_KEY, proxy):
            return self.db.sadd(REDIS_KEY, proxy)
        else:
            return None

    def del_one(self, proxy):
        """删除代理"""
        self.db.srem(REDIS_KEY, proxy)

    def get_one(self):
        """随机获取一个有效代理"""
        result = self.db.srandmember(REDIS_KEY, 1)
        if len(result) >= 1:
            return result[0]
        else:
            return None

    def count(self):
        """获取数量"""
        return self.db.scard(REDIS_KEY)

    def get_all(self):
        """获取全部代理"""
        return self.db.smembers(REDIS_KEY)


获取模块代码

from time import sleep

import requests
from lxml import etree

from config import KAIDAILI_PAGE_COUNT, XICI_PAGE_COUNT

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 5.1; U; en; rv:1.8.1) Gecko/20061208 Firefox/2.0.0 Opera 9.50',
}


def get_page(url):
    """获取页面"""
    try:
        res = requests.get(url, headers=headers, timeout=5)
    except:
        return None
    else:
        if res.status_code == 200:
            return res.text
        else:
            return None


class Crawler:
    """获取免费代理类"""
    def crawl_xici(self, page_count=XICI_PAGE_COUNT):
        """获取西刺代理"""
        start_url = 'https://www.xicidaili.com/nn/{}'
        for page in range(1, page_count + 1):
            url = start_url.format(page)
            res = get_page(url)
            if res:
                html = etree.HTML(res)
                ips = html.xpath('//table[@id="ip_list"]/tr/td[2]/text()')
                ports = html.xpath('//table[@id="ip_list"]/tr/td[3]/text()')
                if len(ips) == len(ports):
                    for i in range(len(ips)):
                        proxy = ips[i] + ':' + ports[i]
                        yield proxy
            sleep(10)

    def crawl_kuaidaili(self, page_count=KAIDAILI_PAGE_COUNT):
        """获取快代理"""
        start_url = 'https://www.kuaidaili.com/free/inha/{}/'
        for page in range(1, page_count + 1):
            url = start_url.format(page)
            res = get_page(url)
            if res:
                html = etree.HTML(res)
                ips = html.xpath('//td[@data-title="IP"]/text()')
                ports = html.xpath('//td[@data-title="PORT"]/text()')
                if len(ips) == len(ports):
                    for i in range(len(ips)):
                        proxy = ips[i] + ':' + ports[i]
                        yield proxy
            sleep(10)

    def get_proxy(self):
        """迭代返回proxy"""
        for proxy in self.crawl_xici():
            yield proxy
        for proxy in self.crawl_kuaidaili():
            yield proxy



检查模块代码

import requests

from config import TEST_URL
from proxy_storage import RedisClient


class Checker:
    """检查代理类"""
    def __init__(self):
        """初始化"""
        self.redis = RedisClient()

    def check_proxy(self, proxy):
        """测试单个代理是否可用"""
        proxies = {
            "http": "http://" + proxy,
            "https": "https://" + proxy,
        }
        try:
            response = requests.get(TEST_URL, proxies=proxies, timeout=5)
        except:
            print('proxy请求失败:', proxy)
            return False
        else:
            if response.status_code == 200:
                print('proxy可用:', proxy)
                return True
            else:
                print('proxy不可用:', proxy)
                return False


    def review(self):
        """proxy复查"""
        try:
            proxies = self.redis.get_all()
            for proxy in proxies:
                res = self.check_proxy(proxy)
                if not res:
                    self.redis.del_one(proxy)
                    print('proxy复查不可用,已删除:', proxy)
        except Exception as e:
            print('测试器发生错误', e.args)

    def first_check(self, proxy):
        """proxy存入redis之前的首次检查"""
        res = self.check_proxy(proxy)
        return res


接口模块代码

from flask import Flask, g

from proxy_storage import RedisClient

__all__ = ['app']
app = Flask(__name__)


def get_conn():
    if not hasattr(g, 'redis'):
        g.redis = RedisClient()
    return g.redis


@app.route('/')
def index():
    """主页"""
    return '<h2>欢迎使用IP代理池!</h2>'


@app.route('/get_one')
def get_proxy():
    """获取随机可用代理"""
    conn = get_conn()
    proxy = conn.get_one()
    if proxy:
        return proxy
    else:
        return "暂无可用代理"


@app.route('/count')
def get_counts():
    """获取代理池总量"""
    conn = get_conn()
    return str(conn.count())


配置文件代码

# 检查proxy是否可用时使用的测试url
TEST_URL = 'http://www.baidu.com'
# TEST_URL = 'https://weixin.sogou.com/'

# 获取西刺代理的页数
XICI_PAGE_COUNT = 5
# 获取快代理的页数
KAIDAILI_PAGE_COUNT = 10

# 设置redis存储proxy上限
THRESHOLD = 100

# 每次获取proxy的间隔时长
GET_INTERVAL = 100
# 每次复查proxy的间隔时长
REVIEW_INTERVAL = 100

# redis配置
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = '123456'
REDIS_KEY = 'my_proxies'

# API服务host和port
API_HOST = '127.0.0.1'
API_PORT = '8080'


执行文件代码

from time import sleep

from multiprocessing import Process

from check_proxy import Checker
from config import THRESHOLD, REVIEW_INTERVAL, GET_INTERVAL, API_HOST, API_PORT
from get_proxy import Crawler
from proxy_api import app
from proxy_storage import RedisClient


class Manager:
    """获取模块和检查模块的调用"""
    def __init__(self):
        """初始化"""
        self.redis = RedisClient()
        self.crawler = Crawler()
        self.tester = Checker()

    def is_over_threshold(self):
        """判断是否达到代理池限制"""
        if self.redis.count() >= THRESHOLD:
            return True
        else:
            return False

    def getter(self):
        """获取模块逻辑"""
        if not self.is_over_threshold():
            for proxy in self.crawler.get_proxy():
                res = self.tester.first_check(proxy)
                if res:
                    info = self.redis.add_one(proxy)
                    if not info:
                        print('该proxy已经存在:', proxy)
                    else:
                        print('成功存入proxy:', proxy)
                else:
                    print('proxy不可用,已剔除:', proxy)

    def reviewer(self):
        """检查模块逻辑"""
        self.tester.review()


manager = Manager()


class Scheduler:
    """调度器,程序运行逻辑实现"""
    def schedule_getter(self):
        """定时获取代理"""
        while True:
            print('开始获取proxy')
            manager.getter()
            sleep(GET_INTERVAL)

    def schedule_reviewer(self):
        """定时测试代理"""
        while True:
            print('开始审核proxy')
            manager.reviewer()
            sleep(REVIEW_INTERVAL)

    def schedule_api(self):
        """开启API"""
        print("api已开启")
        app.run(API_HOST, API_PORT)

    def run(self):
        """多进程运行代理池"""
        print('代理池开始运行')

        getter_process = Process(target=self.schedule_getter)
        getter_process.start()

        tester_process = Process(target=self.schedule_reviewer)
        tester_process.start()

        api_process = Process(target=self.schedule_api)
        api_process.start()


if __name__ == '__main__':
    scheduler = Scheduler()
    scheduler.run()

运行该代理池:

由于是获取的免费代理,所以很多代理都是不可用的,哈哈哈,毕竟是免费的。

redis数据库显示如下:

然后我们通过访问接口获取可用代理:

这样一个免费的IP代理池就完成了,哈哈哈。



版权声明:本文为heibuliuqiu_gk原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。