python金融爬虫,抓取某券商的股票名称、简介

  • Post author:
  • Post category:python



网页为 get 请求,获取的 json 数据,数据结构处理,如下图:

  • 下图代码解释, for vaule in page_text[“items”]:

    在这里插入图片描述


    codes_test.txt 文件列表形式,每行1个,下图:


    在这里插入图片描述


    运行状态显示,如下图:


    在这里插入图片描述


    全部代码如下:
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Life is short,you need Python.[Where not, where to add.]

import csv
import json
import os
import time
import winsound
import requests


def get_tiger_code_info(code="amd"):
    """根据code获取name和description"""

    # 字母表
    alphabet_list = [
        "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n",
        "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"
    ]
    
    # 美股(A股、港股、指数,本处不写,增加相应 esif 进行处理即可)
    if (code[0].lower() in alphabet_list):
        # 根据code的首字判断类型

        url = "https://www.laohu8.com/proxy/stock/stock_info/profile/" + code.lower(
        )
        headers = {
            "User-Agent":
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.50 Safari/537.36 Edg/88.0.705.29"
        }

        # 获取json数据,解析
        page_json = requests.get(url, headers).text
        page_text = json.loads(page_json)
        # print(page_text)

        # 异常处理
        try:
            for vaule in page_text["items"]:
                # print(vaule["name"]) # 用于调试
                # print(vaule["description"]) # 用于调试

                # 股票的名称、简介
                code_extract = vaule["name"]  # 从网页源代码中提取的 code
                description_extract = vaule["description"]  # 从网页源代码中提取的 简介
                # exchange_extract = vaule["exchange"] # 用于调试

            # 写入到txt文件
            # with open("./动态刷新.txt", "a") as f1:
            # f1.write(f"{code_extract.upper()}\n{description_extract}\n\n")

            # 写入到csv文件
            with open("./动态刷新.csv", "a", newline="") as f1:
                # 参数 newline="" 避免csv文件新增空行

                csv_write = csv.writer(f1, dialect="excel")
                # f1.write(f"{code_extract.upper()}\n{description_extract}\n\n")

                # 设置标题行
                # data_row_1 = ["OrderID",Market", "Code", "Description"]
                # csv_write.writerow(data_row_1)

                # 准备写入的每行数据,首个空格留空,用于后期填序号
                data_row = [
                    "", "us",
                    code_extract.upper(), description_extract
                ]

                print(f"正在写入{code_extract.upper()}...") # 用于查看程序运行状态
                csv_write.writerow(data_row)

        except Exception as e:
            print(f"出现错误!原因:{e}")


# 主程序入口
if __name__ == "__main__":
    # start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    start_time2 = time.time()
    print(f"------程序已启动------")

    # 读取 codes 列表
    file_codes = os.path.join(r"D:\Jackey文档", "codes_test.txt")

    with open(file_codes, "r", encoding="utf-8") as f2:
        # 如出现乱码可改用 utf-8-sig

        # 读取全部
        read_codes = f2.readlines()

        # 遍历,并调用相关函数
        for read_code in read_codes:
            get_tiger_code_info(read_code.strip())  # 除去空格
            time.sleep(0.5)  # 延时反扒

    # end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    end_time2 = time.time()
    print(f"------程序已结束------")
    print(f"------总用时 {end_time2-start_time2:.2f} s.------")

print("请查阅!\n")

# 结束,声音提示
winsound.PlaySound("SystemHand", winsound.SND_ALIAS)



版权声明:本文为weixin_41500838原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。