ETL数据库数据采集&订单数据采集

  • Post author:
  • Post category:其他




问题解决:
  1. 增加目标数据库配置信息
# 结果写出MySQL的数据库相关配置
target_host = 'localhost'
target_port = 3306
target_user = 'root'
target_password = 'mysql'
target_data_db = "retail"
  1. csv文件目录拼接时,根路径没有末尾\

方式一: 在配置文件中,将csv文件的根目录末尾添加一个/

方式二: 在csv文件根目录和文件名拼接时使用os.path.join()连接




01-商品数据采集-业务需求说明(理解)


商品数据在mysql 数据库中, 这个数据源所在的数据库,是哪一个部门负责管理的??

前端开发: 负责用户交互

后端开发: 负责数据交互(

该数据源归后端开发管理和使用

)

大数据开发: 负责海量数据的存储和计算(负责数据的ETL,保证数据满足数据分析的要求)

注意: 大数据开发人员,在后端数据库中通常只有数据库的查询权限,无法进行其他操作

大数据开发通常获取的是二手数据(去重,脱敏,整合后的数据…)


需求介绍:

1)将采集的商品数据保存到目标数据库中

  • 在正常开发中,这个目标数据库其实是一个数仓,不会使用关系型数据库存储(RDBMS).

2)将采集的商品数据写出到 CSV 文件中

3)仅采集增量数据(

已经采集过的数据,不重复采集

)


实现思路:

① 查询元数据库表,获取上一次采集商品数据中 updateAt 的最大值

② 根据上一次采集商品数据中 updateAt 的最大值,查询数据源库商品表,获取继上一次采集之后,新增和更新的商品数据

③ 针对新增和更新的商品数据,进行数据采集(ETL->mysql->csv)

④ 将本次采集商品数据中的 updateAt 的最大值,保存到元数据库表中



02-测试数据添加 (

掌握

)


添加数据方式有两种:

  1. 使用终端添加

    • 首先打开终端工具(例如: cmd)

    • 在内部开启mysql客户端

      mysql -uroot -p123456

    • 在mysql客户端中输入

      -- 创建数据源数据库
      create database source_data charset='utf8';
      -- 使用该数据库
      use source_data;
      -- 执行脚本导入数据
      source sql脚本的存放路径(可以从文件夹中将文件拖拽到cmd中获取路径)
      
  2. 使用第三方工具

    • 打开Datagrip,并连接数据源所在的数据库

    • 在数据库列表中,右键点击要导入数据的数据库, 选择 ‘Run sql Script’

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Y0nPrKf1-1688341417117)(day04-数据库数据采集&订单数据采集.assets/1672626735205.png)]

    • 选择要执行的sql脚本点击确定即可

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PBHxCEKI-1688341417118)(day04-数据库数据采集&订单数据采集.assets/1672626794057.png)]



03-增量数据采集(理解)

全量采集: 指定数据源中的所有数据全部采集

增量采集: 指定数据源中的新增数据,和修改数据的采集


问题: 怎样判断增量采集??

创建或更新时间,大于上次采集的最大时间,即可证明是增量数据.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JBawPQ33-1688341417119)(day04-数据库数据采集&订单数据采集.assets/4.png)]

  • defalut current_timestamp: 插入数据时,默认值为当前时间
  • on update current_timestamp: 更新数据时将其更改为当前时间


问题2: 怎样实现增量采集??

  1. 查询(采集)数据的时候,SELECT SQL语句按照

    updateAt

    进行排序,按照

    升序

    排序


排序原因:

  1. 进行where 中的< > <= >=操作时,排序后的数据筛选效率更高
  2. 采集后 写入csv和目标数据库的时候也是按照时间顺序进行排序的
  1. 当采集完成后,将当前批次最大的时间记录在:MySQL的

    元数据库中

  2. 下一次采集的时候,从MySQL的元数据库中,查询出来

    上一次采集的时间

  3. SQL的SELECT语句的WHERE条件设置为:

    updateAt > 上一次采集时间

    即可



04-配置文件导入


config/project_config.py

# ################## --数据库barcode商品数据采集配置项-- ###################
# 元数据库配置
# barcode业务:update_at字段的监控表的名称
metadata_barcode_table_name = 'barcode_monitor'
# barcode业务:update_at字段的监控表的建表语句的列信息
metadata_barcode_table_create_cols = "id INT PRIMARY KEY AUTO_INCREMENT COMMENT '自增ID', " \
                                     "time_record TIMESTAMP NOT NULL COMMENT '本次采集记录的最大时间', " \
                                     "gather_line_count INT NULL COMMENT '本次采集条数'"
# 数据源库配置
source_host = 'localhost'
source_user = 'root'
source_password = '123456'
source_port = 3306
source_data_db = 'source_data'
# 数据源表名称
source_barcode_table_name = 'sys_barcode'


# 目标数据库配置
# 条码商品表名称
target_barcode_table_name = 'sys_barcode'
target_barcode_table_create_cols = """
    `code` varchar(50) PRIMARY KEY COMMENT '商品条码',
    `name` varchar(200) DEFAULT '' COMMENT '商品名称',
    `spec` varchar(200) DEFAULT '' COMMENT '商品规格',
    `trademark` varchar(100) DEFAULT '' COMMENT '商品商标',
    `addr` varchar(200) DEFAULT '' COMMENT '商品产地',
    `units` varchar(50) DEFAULT '' COMMENT '商品单位(个、杯、箱、等)',
    `factory_name` varchar(200) DEFAULT '' COMMENT '生产厂家',
    `trade_price` DECIMAL(50, 5) DEFAULT 0.0 COMMENT '贸易价格(指导进价)',
    `retail_price` DECIMAL(50, 5) DEFAULT 0.0 COMMENT '零售价格(建议卖价)',
    `update_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    `wholeunit` varchar(50) DEFAULT NULL COMMENT '大包装单位',
    `wholenum` int(11) DEFAULT NULL COMMENT '大包装内装数量',
    `img` varchar(500) DEFAULT NULL COMMENT '商品图片',
    `src` varchar(20) DEFAULT NULL COMMENT '源信息', 
    INDEX (update_at)
"""

# 商品数据写出 csv 根路径
barcode_output_csv_root_path = "D:/etl/output/csv/"
# 商品数据写出 csv 文件名
barcode_orders_output_csv_file_name = f'barcode-{time.strftime("%Y-%m-%d-%H_%M", time.localtime())}.csv'



05-构建商品数据模型类(理解)


model/barcode_model.py

"""
条码商品信息模型类
"""
from util import str_util
from config import project_config as conf


class BarcodeModel(object):
    """条码商品信息模型类"""
    def __init__(self, code=None, name=None, spec=None, trademark=None,
                 addr=None, units=None, factory_name=None, trade_price=None,
                 retail_price=None, update_at=None, wholeunit=None,
                 wholenum=None, img=None, src=None):
        """条码商品模型对象初始化"""
        self.code = code
        self.name = str_util.clear_str(name)
        self.spec = str_util.clear_str(spec)
        self.trademark = str_util.clear_str(trademark)
        self.addr = str_util.clear_str(addr)
        self.units = str_util.clear_str(units)
        self.factory_name = str_util.clear_str(factory_name)
        self.trade_price = trade_price
        self.retail_price = retail_price
        self.update_at = update_at
        self.wholeunit = str_util.clear_str(wholeunit)
        self.wholenum = wholenum
        self.img = img
        self.src = src

    def generate_insert_sql(self):
        """生成SQL的插入语句"""
        sql = f"REPLACE INTO {conf.target_barcode_table_name}(" \
              f"code,name,spec,trademark,addr,units,factory_name,trade_price," \
              f"retail_price,update_at,wholeunit,wholenum,img,src) VALUES(" \
              f"'{self.code}', " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.name)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.spec)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.trademark)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.addr)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.units)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.factory_name)}, " \
              f"{str_util.check_number_null_and_transform_to_sql_null(self.trade_price)}, " \
              f"{str_util.check_number_null_and_transform_to_sql_null(self.retail_price)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.update_at)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.wholeunit)}, " \
              f"{str_util.check_number_null_and_transform_to_sql_null(self.wholenum)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.img)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.src)}" \
              f")"

        return sql

    @staticmethod
    def get_csv_header(sep=','):
        """
        生成 csv 数据的标头内容
        """
        return f"code{sep}" \
               f"name{sep}" \
               f"spec{sep}" \
               f"trademark{sep}" \
               f"addr{sep}" \
               f"units{sep}" \
               f"factory_name{sep}" \
               f"trade_price{sep}" \
               f"retail_price{sep}" \
               f"update_at{sep}" \
               f"wholeunit{sep}" \
               f"wholenum{sep}" \
               f"img{sep}" \
               f"src\n"

    def to_csv(self, sep=','):
        """生成csv数据行"""
        csv_line = \
            f"{self.code}{sep}" \
            f"{self.name}{sep}" \
            f"{self.spec}{sep}" \
            f"{self.trademark}{sep}" \
            f"{self.addr}{sep}" \
            f"{self.units}{sep}" \
            f"{self.factory_name}{sep}" \
            f"{self.trade_price}{sep}" \
            f"{self.retail_price}{sep}" \
            f"{self.update_at}{sep}" \
            f"{self.wholeunit}{sep}" \
            f"{self.wholenum}{sep}" \
            f"{self.img}{sep}" \
            f"{self.src}\n"

        return csv_line


06-商品数据采集核心业务实现(

理解,并跟着敲一遍

)


6.1 查询上次采集的最大时间
  1. 查询元数据库表,获取上批次条码商品数据处理的最大时间
  • 创建元数据库连接对象
  • 检查元数据表是否存在,不存在则创建
  • 查询商品采集元数据表中,上一次采集记录的 updateAt 的最大值
  • 判断获取到的数据集是否有值,创建变量保存该值

    • 如果有值则保存
    • 如果没有值则保存None


barcode_service.py

# 0. 导包
from util import mysql_util
from config import project_config as conf

# 1. 查询元数据库表,获取上批次条码商品数据处理的最大时间
#
# 1.1 创建元数据库连接对象
metadata_util = mysql_util.get_mysql_util(
    host=conf.metadata_host,
    port=conf.metadata_port,
    user=conf.metadata_user,
    password=conf.metadata_password
)
# 1.2 检查元数据表是否存在,不存在则创建
metadata_util.check_table_exists_and_create(
    db_name=conf.metadata_db,
    tb_name=conf.metadata_barcode_table_name,
    tb_cols=conf.metadata_barcode_table_create_cols
)
# 1.3 查询商品采集元数据表中,上一次采集记录的 updateAt 的最大值
# 1.3.1 创建查询sql
sql = f'select max(updateAt) from {conf.metadata_barcode_table_name};'
# 1.3.2 执行sql
result = metadata_util.query(sql) # ((最大时间,),)
# 1.4 判断获取到的数据集是否有值,创建变量保存该值
if result[0][0]:
    # 1.4.1 如果有值则保存
    barcode_max_time = result[0][0]
else:
    # 1.4.2 如果没有值则保存None
    barcode_max_time = None


6.2 根据最大采集时间,从数据源中采集数据
  1. 根据上一次采集商品数据中 updateAt 的最大值,查询数据源库商品表,获取继上一次采集之后,新增和更新的商品数据
  • 创建数据源数据库连接对象
  • 判断数据源数据库是否存在

    • 如果不存在则退出程序
  • 查询元数据库表,获取增量更新的条码商品数据

    • 没有最大采集时间(初次采集,全量采集)
    • 有最大采集时间(再次采集,增量采集)
  • 判断采集数据条目数

    • 如果条目数为0则退出去程序


barcode_service.py

# 2. 根据上一次采集商品数据中 updateAt 的最大值,查询数据源库商品表,获取继上一次采集之后,新增和更新的商品数据
# 2.1 创建数据源数据库连接对象
source_util = mysql_util.get_mysql_util(
    host=conf.source_host,
    port=conf.source_port,
    user=conf.source_user,
    password=conf.source_password
)
# 2.2 判断数据源数据库是否存在, 如果不存在则退出程序
if not source_util.check_table_exists(conf.source_data_db, conf.source_barcode_table_name):
    exit('数据源不存在,请与后端开发人员协商(sibi)解决')
# 2.3 查询元数据库表,获取增量更新的条码商品数据
if not barcode_max_time:
    # 2.3.1 没有最大采集时间(初次采集,全量采集 没有where)
    sql = f'select * from {conf.source_barcode_table_name} order by updateAt;'
else:
    # 2.3.2 有最大采集时间(再次采集,增量采集 有where)
    sql = f'select * from {conf.source_barcode_table_name} ' \
          f'where updateAt > "{barcode_max_time}" ' \
          f'order by updateAt;'
# 执行sql语句
result = source_util.query(sql)
# 2.4 判断采集数据条目数, 如果条目数为0则退出程序
if not len(result):
    exit('没有待采集数据...')


6.3 将条码数据写入mysql和csv文件中
  1. 针对新增和更新的商品数据,进行数据采集(ETL->mysql->csv)
  • 检查目标数据库表是否存在,如果不存在则创建
  • 创建目标csv文件
  • 写入csv的标头
  • 创建目标数据库连接对象
  • 根据数据源采集结果创建数据模型

    • 写入mysql数据库
    • 写入csv文件
  • 关闭数据库连接

为了创建模型更加简便我们修改了模型类的基础逻辑


model/barcode_model.py

  """条码商品信息模型类"""
    def __init__(self, data_tuple: tuple):
        """条码商品模型对象初始化"""
        self.code = data_tuple[0]
        self.name = str_util.clear_str(data_tuple[1])
        self.spec = str_util.clear_str(data_tuple[2])
        self.trademark = str_util.clear_str(data_tuple[3])
        self.addr = str_util.clear_str(data_tuple[4])
        self.units = str_util.clear_str(data_tuple[5])
        self.factory_name = str_util.clear_str(data_tuple[6])
        self.trade_price = data_tuple[7]
        self.retail_price = data_tuple[8]
        self.update_at = str(data_tuple[9])
        self.wholeunit = data_tuple[10]
        self.wholenum = data_tuple[11]
        self.img = data_tuple[12]
        self.src = data_tuple[13]


barcode_service.py

# 3. 针对新增和更新的商品数据,进行数据采集(ETL->mysql->csv)
# 3.1 创建目标数据库连接对象
target_util = mysql_util.get_mysql_util(
    host=conf.target_host,
    port=conf.target_port,
    user=conf.target_user,
    password=conf.target_password
)
# 3.2 检查目标数据库表是否存在,如果不存在则创建
target_util.check_table_exists_and_create(
    db_name=conf.target_data_db,
    tb_name=conf.target_barcode_table_name,
    tb_cols=conf.target_barcode_table_create_cols
)
# 3.3 创建目标csv文件
csv_file = open(
    file=conf.barcode_output_csv_root_path + conf.barcode_orders_output_csv_file_name,
    mode='a',
    encoding='utf8')
# 3.4 写入csv的标头
csv_file.write(BarcodeModel.get_csv_header())
# 3.5 根据数据源采集结果创建数据模型
for row_data in result:
    model = BarcodeModel(row_data)
    # 3.5.1 写入mysql数据库
    target_util.insert_single_sql(model.generate_insert_sql())
    # 3.5.2 写入csv文件
    csv_file.write(model.to_csv())
# 3.6 关闭数据库连接
csv_file.close()
target_util.close()
source_util.close()


6.4 添加事务(1000条提交一次)

记录处理条目数,如果处理条目达到1000的倍数

  1. 提交mysql数据
  2. 记录元数据
  3. 关闭csv文件


barcode_service.py

# 3. 针对新增和更新的商品数据,进行数据采集(ETL->mysql->csv)
# 3.1 创建目标数据库连接对象
target_util = mysql_util.get_mysql_util(
    host=conf.target_host,
    port=conf.target_port,
    user=conf.target_user,
    password=conf.target_password
)
# 3.2 检查目标数据库表是否存在,如果不存在则创建
target_util.check_table_exists_and_create(
    db_name=conf.target_data_db,
    tb_name=conf.target_barcode_table_name,
    tb_cols=conf.target_barcode_table_create_cols
)
# 3.3 创建目标csv文件
csv_file = open(
    file=conf.barcode_output_csv_root_path + conf.barcode_orders_output_csv_file_name,
    mode='a',
    encoding='utf8')
# 3.4 写入csv的标头
csv_file.write(BarcodeModel.get_csv_header())
# 事务1: 创建一个变量,保存处理数据的条目数 
data_count = 0
# 事务2: 首次开启事务
target_util.begin_transaction()

# 3.5 根据数据源采集结果创建数据模型
for row_data in result:
    # 事务4: 计数器自增
    data_count += 1
    try:  
        # 创建一个模型
        model = BarcodeModel(row_data)
        # 3.5.1 写入mysql数据库
        target_util.insert_single_sql_without_commit(model.generate_insert_sql())
        # 3.5.2 写入csv文件
        csv_file.write(model.to_csv())
      
    except Exception as e:
        # 回滚事务
        target_util.rollback_transaction()
        # 结束程序 (如果想要在回滚后恢复各种变量的值,做的操作太过繁琐不如将脚本直接杀死后重新执行)
        exit('插入数据库失败结束程序')
    else:
        # 事务3: 判断数据条目是否为1000的倍数
        if data_count % 1000 == 0:
            # 提交事务
            target_util.commit_transaction()
            # 开启下一次事务
            target_util.begin_transaction()
            # 事务5: 关闭csv文件,并创建下一个csv文件
            csv_file.close()
            # 开启下一个文件
            csv_file = open(
                file=conf.barcode_output_csv_root_path + conf.barcode_orders_output_csv_file_name,
                mode='a',
                encoding='utf8')
            # 写入csv的标头
            csv_file.write(BarcodeModel.get_csv_header())
  
else:
    # 事务6: 数据执行结束,将没有提交的sql语句提交和csv文件关闭
    # 提交事务
    target_util.commit_transaction()
    # 关闭csv文件
    csv_file.close()
        
# 3.6 关闭数据库连接
target_util.close()
source_util.close()


6.5 事务提交后将最大时间保存到元数据库中
  1. 将本次采集商品数据中的 updateAt 的最大值,保存到元数据库表中
  • 准备sql语句
  • 执行sql语句


barcode_service.py

# 3. 针对新增和更新的商品数据,进行数据采集(ETL->mysql->csv)
# 3.1 创建目标数据库连接对象
target_util = mysql_util.get_mysql_util(
    host=conf.target_host,
    port=conf.target_port,
    user=conf.target_user,
    password=conf.target_password
)
# 3.2 检查目标数据库表是否存在,如果不存在则创建
target_util.check_table_exists_and_create(
    db_name=conf.target_data_db,
    tb_name=conf.target_barcode_table_name,
    tb_cols=conf.target_barcode_table_create_cols
)
# 3.3 创建目标csv文件
csv_file = open(
    file=conf.barcode_output_csv_root_path + conf.barcode_orders_output_csv_file_name,
    mode='a',
    encoding='utf8')
# 3.4 写入csv的标头
csv_file.write(BarcodeModel.get_csv_header())
# 事务1: 创建一个变量,保存处理数据的条目数
data_count = 0
# 事务2: 首次开启事务
target_util.begin_transaction()

# 3.5 根据数据源采集结果创建数据模型
for row_data in result:
    # 事务4: 计数器自增
    data_count += 1
    try:
        # 创建一个模型
        model = BarcodeModel(row_data)
        # 3.5.1 写入mysql数据库
        target_util.insert_single_sql_without_commit(model.generate_insert_sql())
        # 3.5.2 写入csv文件
        csv_file.write(model.to_csv())

    except Exception as e:
        # 回滚事务
        target_util.rollback_transaction()
        # 结束程序 (如果想要在回滚后恢复各种变量的值,做的操作太过繁琐不如将脚本直接杀死后重新执行)
        exit('插入数据库失败结束程序')
    else:
        # 事务3: 判断数据条目是否为1000的倍数
        if data_count % 1000 == 0:
            # 提交事务
            target_util.commit_transaction()
            # 4.将本次采集商品数据中的updateAt的最大值,保存到元数据库表中
            # 4.1 准备sql语句
            sql = f'insert into {conf.metadata_barcode_table_name}(time_record, gather_line_count)' \
                  f'values ("{model.update_at}", 1000)'
            # 4.2 执行sql语句
            metadata_util.insert_single_sql(sql)

            # 开启下一次事务
            target_util.begin_transaction()
            # 事务5: 关闭csv文件,并创建下一个csv文件
            csv_file.close()
            # 开启下一个文件
            csv_file = open(
                file=conf.barcode_output_csv_root_path + conf.barcode_orders_output_csv_file_name,
                mode='a',
                encoding='utf8')
            # 写入csv的标头
            csv_file.write(BarcodeModel.get_csv_header())

else:
    # 事务6: 数据执行结束,将没有提交的sql语句提交和csv文件关闭
    # 提交事务
    target_util.commit_transaction()
    # 4.将本次采集商品数据中的updateAt的最大值,保存到元数据库表中
    # 4.1 准备sql语句
    sql = f'insert into {conf.metadata_barcode_table_name}(time_record, gather_line_count)' \
          f'values ("{model.update_at}", {data_count % 1000})'
    # 4.2 执行sql语句
    metadata_util.insert_single_sql(sql)
    # 关闭csv文件
    csv_file.close()

# 3.6 关闭数据库连接
target_util.close()
source_util.close()
metadata_util.close()



6.6 记录日志
  1. 脚本启动时记录日志
  2. 源数据中数据表村存在时记录日志
  3. 采集数据为空时记录日志
  4. 每次commit和关闭csv文件前记录日志
  5. 记录元数据后记录日志
  6. 脚本执行结束后记录日志


barcode_service.py

# 0. 导包
import time

from util import mysql_util
from util import logging_util
from config import project_config as conf
from model.barcode_model import BarcodeModel

# 日志1:创建日志器对象
logger = logging_util.init_logger('barcode')

# 日志2: 记录脚本的启动
logger.info('商品数据采集开始...')

# 1. 查询元数据库表,获取上批次条码商品数据处理的最大时间
#
# 1.1 创建元数据库连接对象
metadata_util = mysql_util.get_mysql_util(
    host=conf.metadata_host,
    port=conf.metadata_port,
    user=conf.metadata_user,
    password=conf.metadata_password
)
# 1.2 检查元数据表是否存在,不存在则创建
metadata_util.check_table_exists_and_create(
    db_name=conf.metadata_db,
    tb_name=conf.metadata_barcode_table_name,
    tb_cols=conf.metadata_barcode_table_create_cols
)
# 1.3 查询商品采集元数据表中,上一次采集记录的 updateAt 的最大值
# 1.3.1 创建查询sql
sql = f'select max(time_record) from {conf.metadata_barcode_table_name};'
# 1.3.2 执行sql
result = metadata_util.query(sql)  # ((最大时间,),)
# 1.4 判断获取到的数据集是否有值,创建变量保存该值
if result[0][0]:
    # 1.4.1 如果有值则保存
    barcode_max_time = result[0][0]
else:
    # 1.4.2 如果没有值则保存None
    barcode_max_time = None

# 2. 根据上一次采集商品数据中 updateAt 的最大值,查询数据源库商品表,获取继上一次采集之后,新增和更新的商品数据
# 2.1 创建数据源数据库连接对象
source_util = mysql_util.get_mysql_util(
    host=conf.source_host,
    port=conf.source_port,
    user=conf.source_user,
    password=conf.source_password
)
# 2.2 判断数据源数据库是否存在, 如果不存在则退出程序
if not source_util.check_table_exists(conf.source_data_db, conf.source_barcode_table_name):
    # 日志3: 记录数据源数据表不存在时的警告
    logger.error('数据源数据库中的表不存在,退出程序...')
    exit('数据源不存在,请与后端开发人员协商(sibi)解决')
# 2.3 查询元数据库表,获取增量更新的条码商品数据
if not barcode_max_time:
    # 2.3.1 没有最大采集时间(初次采集,全量采集 没有where)
    sql = f'select * from {conf.source_barcode_table_name} order by updateAt;'
else:
    # 2.3.2 有最大采集时间(再次采集,增量采集 有where)
    sql = f'select * from {conf.source_barcode_table_name} ' \
          f'where updateAt > "{barcode_max_time}" ' \
          f'order by updateAt;'
# 执行sql语句
result = source_util.query(sql)
# 2.4 判断采集数据条目数, 如果条目数为0则退出程序
if not len(result):
    # 日志4: 采集数据记录为空时记录日志
    logger.info('没有待采集数据,采集结束...')
    exit('没有待采集数据...')

# 3. 针对新增和更新的商品数据,进行数据采集(ETL->mysql->csv)
# 3.1 创建目标数据库连接对象
target_util = mysql_util.get_mysql_util(
    host=conf.target_host,
    port=conf.target_port,
    user=conf.target_user,
    password=conf.target_password
)
# 3.2 检查目标数据库表是否存在,如果不存在则创建
target_util.check_table_exists_and_create(
    db_name=conf.target_data_db,
    tb_name=conf.target_barcode_table_name,
    tb_cols=conf.target_barcode_table_create_cols
)
# 3.3 创建目标csv文件
csv_file = open(
    file=conf.barcode_output_csv_root_path + conf.barcode_orders_output_csv_file_name,
    mode='a',
    encoding='utf8')
# 3.4 写入csv的标头
csv_file.write(BarcodeModel.get_csv_header())
# 事务1: 创建一个变量,保存处理数据的条目数
data_count = 0
# 事务2: 首次开启事务
target_util.begin_transaction()

# 日志8: 记录脚本执行耗时: 开始时间
start = time.time()

# 3.5 根据数据源采集结果创建数据模型
for row_data in result:
    # 事务4: 计数器自增
    data_count += 1
    try:
        # 创建一个模型
        model = BarcodeModel(row_data)
        # 3.5.1 写入mysql数据库
        target_util.insert_single_sql_without_commit(model.generate_insert_sql())
        # 3.5.2 写入csv文件
        csv_file.write(model.to_csv())

    except Exception as e:
        # 回滚事务
        target_util.rollback_transaction()
        # 结束程序 (如果想要在回滚后恢复各种变量的值,做的操作太过繁琐不如将脚本直接杀死后重新执行)
        exit('插入数据库失败结束程序')
    else:
        # 事务3: 判断数据条目是否为1000的倍数
        if data_count % 1000 == 0:
            # 提交事务
            target_util.commit_transaction()
            # 4.将本次采集商品数据中的updateAt的最大值,保存到元数据库表中
            # 4.1 准备sql语句
            sql = f'insert into {conf.metadata_barcode_table_name}(time_record, gather_line_count)' \
                  f'values ("{model.update_at}", 1000)'
            # 4.2 执行sql语句
            metadata_util.insert_single_sql(sql)
            # # 日志7: 提交事务后记录元数据,记录日志
            logger.info(f'元数据已经提交... 最大记录时间为{model.update_at}, 记录数据条目数为1000')
            # 日志5: 记录向数据库中commit数据的日志
            logger.info(f'本次采集数据条目数为1000,已采集完成并写入数据库,共采集{data_count}')
            # 开启下一次事务
            target_util.begin_transaction()
            # 事务5: 关闭csv文件,并创建下一个csv文件
            csv_file.close()
            # 日志6: 记录关闭csv文件的日志
            logger.info(f'本次采集数据条目数为1000,已采集完成并写入{csv_file.name}文件中,共采集{data_count}')
            # 开启下一个文件
            csv_file = open(
                file=conf.barcode_output_csv_root_path + conf.barcode_orders_output_csv_file_name,
                mode='a',
                encoding='utf8')
            # 写入csv的标头
            csv_file.write(BarcodeModel.get_csv_header())

else:
    # 事务6: 数据执行结束,将没有提交的sql语句提交和csv文件关闭
    # 提交事务
    target_util.commit_transaction()
    # 日志5: 记录向数据库中commit数据的日志
    logger.info(f'本次采集数据条目数为{data_count % 1000},已采集完成并写入数据库,共采集{data_count}')
    # 4.将本次采集商品数据中的updateAt的最大值,保存到元数据库表中
    # 4.1 准备sql语句
    sql = f'insert into {conf.metadata_barcode_table_name}(time_record, gather_line_count)' \
          f'values ("{model.update_at}", {data_count % 1000})'
    # 4.2 执行sql语句
    metadata_util.insert_single_sql(sql)
    # 日志7: 提交事务后记录元数据,记录日志
    logger.info(f'元数据已经提交... 最大记录时间为{model.update_at}, 记录数据条目数为{data_count % 1000}')
    # 关闭csv文件
    csv_file.close()
    # 日志6: 记录关闭csv文件的日志
    logger.info(f'本次采集数据条目数为{data_count % 1000},已采集完成并写入{csv_file.name}文件中,共采集{data_count}')
# 日志8: 记录脚本执行耗时: 结束时间
end = time.time()

# 日志8: 记录脚本执行耗时:输出日志
logger.info(f'商品数据采集执行结束,共采集数据{data_count}条, 共耗时{end-start}s...')

# 3.6 关闭数据库连接
target_util.close()
source_util.close()
metadata_util.close()


07-订单数据采集-业务需求说明(理解)


业务需求:

1)将采集的订单JSON数据保存到目标数据库中

2)将采集的订单JSON写出到 CSV 文件中

3)采集数据时JSON文件不能重复采集


实现思路:

  1. 获取订单文件夹下面有哪些订单JSON文件
  2. 查询元数据库表中已经被采集的订单JSON文件,来对比确定要采集新的订单JSON文件
  3. 针对待采集的新订单JSON文件,进行数据采集(ETL操作->mysql->csv)
  4. 将本次采集的订单JSON文件,记录到元数据库的表中


08-构建订单数据模型模型类(

重点理解

)


8.1 JSON数据解析(

要记下来

)
  • json模块的使用

json.loads 将JSON字符串数据转换为Python中的数据类型

json.dumps 将python中的数据类型转换为JSON字符串

import json

json_data_str1 = '{"discountRate": 1, "storeShopNo": "None", "dayOrderSeq": 19, "storeDistrict": "龙华区", "isSigned": 0, "storeProvince": "广东省", "origin": 0}'

json_data_str2 = '{"discountRate": 1, "storeShopNo": "None", "dayOrderSeq": 19, "storeDistrict": "龙华区", "isSigned": 0, "storeProvince": "广东省", "origin": 0, "products":[{"name": "小浣熊洗洁精", "count":1},{"name": "劲霸童装", "count":2}]}'
"""
在JSON数据中只有 object(对象类型), array(数组类型), number(数值类型), string(字符串类型), boolean(布尔类型)
"""

# 1. 将Json字符串转换为python中的数据类型
python_data1 = json.loads(json_data_str1)
print(python_data1)
print(type(python_data1))

python_data2 = json.loads(json_data_str2)
print(python_data2)
print(type(python_data2))
print(type(python_data2['products']))

# 2.将python中的数据类型转换为JSON字符串
json_str1 = json.dumps(python_data1, ensure_ascii=False, indent=4)
print(json_str1)
print(type(json_str1))

  • JSON数据格式化

格式化JSON数据的网站:

在线JSON校验格式化工具(Be JSON)

{
	"discountRate": 1,
	"storeShopNo": "None",
	"dayOrderSeq": 14,
	"storeDistrict": "澧县",
	"isSigned": 0,
	"storeProvince": "湖南省",
	"origin": 0,
	"storeGPSLongitude": "111.76220097590462",
	"discount": 0,
	"storeID": 1837,
	"productCount": 3,
	"operatorName": "OperatorName",
	"operator": "NameStr",
	"storeStatus": "open",
	"storeOwnUserTel": 12345678910,
	"payType": "cash",
	"discountType": 2,
	"storeName": "九澧超市",
	"storeOwnUserName": "OwnUserNameStr",
	"dateTS": 1542436530000,
	"smallChange": 0,
	"storeGPSName": "None",
	"erase": 0,
	"product": [{
		"count": 1,
		"name": "好丽友呀土豆番茄酱味40g",
		"unitID": 7,
		"barcode": "6920907808513",
		"pricePer": 3.5,
		"retailPrice": 3.5,
		"tradePrice": 2.85,
		"categoryID": 10
	}, {
		"count": 1,
		"name": "卫龙素食经典大面筋102g",
		"unitID": 7,
		"barcode": "6935284499995",
		"pricePer": 3,
		"retailPrice": 3,
		"tradePrice": 2,
		"categoryID": 10
	}],
	"storeGPSAddress": "None",
	"orderID": "154243653020318374152",
	"moneyBeforeWholeDiscount": 10.5,
	"storeCategory": "normal",
	"receivable": 10.5,
	"faceID": "",
	"storeOwnUserId": 1770,
	"paymentChannel": 0,
	"paymentScenarios": "OTHER",
	"storeAddress": "StoreAddress",
	"totalNoDiscount": 10.5,
	"payedTotal": 10.5,
	"storeGPSLatitude": "29.649557079006957",
	"storeCreateDateTS": 1540883593000,
	"storeCity": "常德市",
	"memberID": "0"
}

经过格式化,我们可以清晰的看到,上述json数据转换为python数据类型时结果为:

字典 嵌套 列表 嵌套 字典的形式

{
    key1:value1,
    key2:[
        {in_key1:in_value1},
        {in_key2:in_value2}
    ]
}
  • 订单数数据解析


问题1: 我们如何将上述json数据转换为msyql中的数据表???

进行数据的拆分,将上述数据分别放置到不同的数据表中,并且进行外键关联


问题2: 上述JSON数据转换后将转换为几张表, 他们的关系是怎样的???

转换为两张表即可 订单表 订单详情表

外键关联, 在订单详情表中创建外键,绑定订单表的主键即可


问题3: 我们如何将上述JSON数据转换为模型???

我们将其转换为两个模型,每个模型中的数据和数据表拆分的逻辑相似


问题4: 上述JSON数据转换为模型后有几个模型类, 他们的关系是怎样的???

拆分后可以有两个模型, 订单模型 订单详情模型

订单模型对象的属性中保存的值就是订单详情模型,一个订单模型可以保存对个订单详情模型

class OrderModel(object):
	def __init__(self,order_id, project_list ):
		self.order_id = order_id
		self.projects = project_list  # 此列表中保存的是多个商品数据模型

表1:订单表

id shop_time total_price
001 2022-1-12 18.5
002 2022-1-15 203.4
003 2022-1-18 19.9

表2:订单详情表

id product_name order_id(外键)
1 小浣熊洗衣液 001
2 雕牌手机 001
3 拉芳拖拉机 002


8.2 订单JSON数据模型分类
订单业务数据模型:
1)订单数据模型:OrdersModel : 存储的是json数据中的订单数据
2)单个售卖商品数据模型:SingleProductSoldModel : 存储的是订单中的单个商品
3)订单详情数据模型:OrdersDetailModel : 存储的是订单中的全部商品(本质就是讲多个SingleProductSoldModel嵌套起来)
4)原始数据模型(订单数据+订单详情数据组合模型):RetailOriginJsonModel : 本质就是订单+商品详情

原始数据模型
	订单模型
	订单详情数据模型
		单个售卖商品模型1
		单个售卖商品模型2

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xR0aRrA4-1688341417120)(day04-数据库数据采集&订单数据采集.assets/1672652136253.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-y0XX0WEF-1688341417121)(day04-数据库数据采集&订单数据采集.assets/1663230173864.png)]



8.3 添加配置文件信息


config/project_config.py

# ################## --订单JSON数据采集配置项-- ###################
# 订单表名称
target_orders_table_name = "orders"
# 订单表MySQL数据库建表语句列信息
target_orders_table_create_cols = \
    f"order_id VARCHAR(255) PRIMARY KEY, " \
    f"store_id INT COMMENT '店铺ID', " \
    f"store_name VARCHAR(30) COMMENT '店铺名称', " \
    f"store_status VARCHAR(10) COMMENT '店铺状态(open,close)', " \
    f"store_own_user_id INT COMMENT '店主id', " \
    f"store_own_user_name VARCHAR(50) COMMENT '店主名称', " \
    f"store_own_user_tel VARCHAR(15) COMMENT '店主手机号', " \
    f"store_category VARCHAR(10) COMMENT '店铺类型(normal,test)', " \
    f"store_address VARCHAR(255) COMMENT '店铺地址', " \
    f"store_shop_no VARCHAR(255) COMMENT '店铺第三方支付id号', " \
    f"store_province VARCHAR(10) COMMENT '店铺所在省', " \
    f"store_city VARCHAR(10) COMMENT '店铺所在市', " \
    f"store_district VARCHAR(10) COMMENT '店铺所在行政区', " \
    f"store_gps_name VARCHAR(255) COMMENT '店铺gps名称', " \
    f"store_gps_address VARCHAR(255) COMMENT '店铺gps地址', " \
    f"store_gps_longitude VARCHAR(255) COMMENT '店铺gps经度', " \
    f"store_gps_latitude VARCHAR(255) COMMENT '店铺gps纬度', " \
    f"is_signed TINYINT COMMENT '是否第三方支付签约(0,1)', " \
    f"operator VARCHAR(10) COMMENT '操作员', " \
    f"operator_name VARCHAR(50) COMMENT '操作员名称', " \
    f"face_id VARCHAR(255) COMMENT '顾客面部识别ID', " \
    f"member_id VARCHAR(255) COMMENT '顾客会员ID', " \
    f"store_create_date_ts TIMESTAMP COMMENT '店铺创建时间', " \
    f"origin VARCHAR(255) COMMENT '原始信息(无用)', " \
    f"day_order_seq INT COMMENT '本订单是当日第几单', " \
    f"discount_rate DECIMAL(10, 5) COMMENT '折扣率', " \
    f"discount_type TINYINT COMMENT '折扣类型', " \
    f"discount DECIMAL(10, 5) COMMENT '折扣金额', " \
    f"money_before_whole_discount DECIMAL(10, 5) COMMENT '折扣前总金额', " \
    f"receivable DECIMAL(10, 5) COMMENT '应收金额', " \
    f"erase DECIMAL(10, 5) COMMENT '抹零金额', " \
    f"small_change DECIMAL(10, 5) COMMENT '找零金额', " \
    f"total_no_discount DECIMAL(10, 5) COMMENT '总价格(无折扣)', " \
    f"pay_total DECIMAL(10, 5) COMMENT '付款金额', " \
    f"pay_type VARCHAR(10) COMMENT '付款类型', " \
    f"payment_channel TINYINT COMMENT '付款通道', " \
    f"payment_scenarios VARCHAR(15) COMMENT '付款描述(无用)', " \
    f"product_count INT COMMENT '本单卖出多少商品', " \
    f"date_ts TIMESTAMP COMMENT '订单时间', " \
    f"INDEX (receivable), INDEX (date_ts)"

# 订单详情表名称
target_orders_detail_table_name = "orders_detail"
# 订单详情建表列信息
target_orders_detail_table_create_cols = \
    f"order_id VARCHAR(255) COMMENT '订单ID', " \
    f"barcode VARCHAR(255) COMMENT '商品条码', " \
    f"name VARCHAR(255) COMMENT '商品名称', " \
    f"count INT COMMENT '本单此商品卖出数量', " \
    f"price_per DECIMAL(10, 5) COMMENT '实际售卖单价', " \
    f"retail_price DECIMAL(10, 5) COMMENT '零售建议价', " \
    f"trade_price DECIMAL(10, 5) COMMENT '贸易价格(进货价)', " \
    f"category_id INT COMMENT '商品类别ID', " \
    f"unit_id INT COMMENT '商品单位ID(包、袋、箱、等)', " \
    f"PRIMARY KEY (order_id, barcode)"

# 订单数据写出 csv 的根路径
retail_output_csv_root_path = "D:/etl/output/csv"
# 每一次运行,订单文件写出路径
retail_orders_output_csv_file_name = \
    f"orders-{time.strftime('%Y-%m-%d-%H_%M', time.localtime())}.csv"
retail_orders_detail_output_csv_file_name = \
    f"orders-detail-{time.strftime('%Y-%m-%d-%H_%M', time.localtime())}.csv"


8.4 订单JSON模型实现(

重点理解

)

mysql的插入形式:


  1. INSERT INTO :

    插入数据,如果插入失败(唯一约束生效,插入重复主键), 报错

  2. INSERT IGNORE INTO :

    插入数据,如果插入失败(唯一约束生效,插入重复主键), 忽略

  3. REPLACE INTO:

    插入数据,如果插入失败(唯一约束生效,插入重复主键), 替换

例如插入数据的id值为3 但是表中已有该id的数据记录

则 1 会报错 2 则不插入数据 3 会覆盖原数据


model/order_model.py

"""
订单业务数据模型:
1)订单数据模型:OrdersModel
2)单个售卖商品数据模型:SingleProductSoldModel
3)订单详情数据模型:OrdersDetailModel
4)原始数据模型(订单数据+订单详情数据组合模型):RetailOriginJsonModel
"""
import json
from config import project_config as conf
from util import str_util, time_util


class OrdersModel(object):
    """订单数据模型"""
    def __init__(self, data):
        """
        利用传入的订单json数据构建订单数据模型对象
        """
        # 将 json 数据转换为字典
        data = json.loads(data)
        # 初始化订单数据模型对象
        self.discount_rate = data['discountRate']  # 折扣率
        self.store_shop_no = data['storeShopNo']  # 店铺店号(无用列)
        self.day_order_seq = data['dayOrderSeq']  # 本单为当日第几单
        self.store_district = data['storeDistrict']  # 店铺所在行政区
        self.is_signed = data['isSigned']  # 是否签约店铺(签约第三方支付体系)
        self.store_province = data['storeProvince']  # 店铺所在省份
        self.origin = data['origin']  # 原始信息(无用)
        self.store_gps_longitude = data['storeGPSLongitude']  # 店铺GPS经度
        self.discount = data['discount']  # 折扣金额
        self.store_id = data['storeID']  # 店铺ID
        self.product_count = data['productCount']  # 本单售卖商品数量
        self.operator_name = data['operatorName']  # 操作员姓名
        self.operator = data['operator']  # 操作员ID
        self.store_status = data['storeStatus']  # 店铺状态
        self.store_own_user_tel = data['storeOwnUserTel']  # 店铺店主电话
        self.pay_type = data['payType']  # 支付类型
        self.discount_type = data['discountType']  # 折扣类型
        self.store_name = data['storeName']  # 店铺名称
        self.store_own_user_name = data['storeOwnUserName']  # 店铺店主名称
        self.date_ts = data['dateTS']  # 订单时间
        self.small_change = data['smallChange']  # 找零金额
        self.store_gps_name = data['storeGPSName']  # 店铺GPS名称
        self.erase = data['erase']  # 是否抹零
        self.store_gps_address = data['storeGPSAddress']  # 店铺GPS地址
        self.order_id = data['orderID']  # 订单ID
        self.money_before_whole_discount = data['moneyBeforeWholeDiscount']  # 折扣前金额
        self.store_category = data['storeCategory']  # 店铺类别
        self.receivable = data['receivable']  # 应收金额
        self.face_id = data['faceID']  # 面部识别ID
        self.store_own_user_id = data['storeOwnUserId']  # 店铺店主ID
        self.payment_channel = data['paymentChannel']  # 付款通道
        self.payment_scenarios = data['paymentScenarios']  # 付款情况(无用)
        self.store_address = data['storeAddress']  # 店铺地址
        self.total_no_discount = data['totalNoDiscount']  # 整体价格(无折扣)
        self.payed_total = data['payedTotal']  # 已付款金额
        self.store_gps_latitude = data['storeGPSLatitude']  # 店铺GPS纬度
        self.store_create_date_ts = data['storeCreateDateTS']  # 店铺创建时间
        self.store_city = data['storeCity']  # 店铺所在城市
        self.member_id = data['memberID'] # 会员ID

    def generate_insert_sql(self):
        """
        生成添加表数据的SQL语句
        """
        sql = f"INSERT IGNORE INTO {conf.target_orders_table_name}(" \
              f"order_id,store_id,store_name,store_status,store_own_user_id," \
              f"store_own_user_name,store_own_user_tel,store_category," \
              f"store_address,store_shop_no,store_province,store_city," \
              f"store_district,store_gps_name,store_gps_address," \
              f"store_gps_longitude,store_gps_latitude,is_signed," \
              f"operator,operator_name,face_id,member_id,store_create_date_ts," \
              f"origin,day_order_seq,discount_rate,discount_type,discount," \
              f"money_before_whole_discount,receivable,erase,small_change," \
              f"total_no_discount,pay_total,pay_type,payment_channel," \
              f"payment_scenarios,product_count,date_ts" \
              f") VALUES(" \
              f"'{self.order_id}', " \
              f"{self.store_id}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.store_name)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.store_status)}, " \
              f"{self.store_own_user_id}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.store_own_user_name)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.store_own_user_tel)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.store_category)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.store_address)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.store_shop_no)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.store_province)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.store_city)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.store_district)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.store_gps_name)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.store_gps_address)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.store_gps_longitude)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.store_gps_latitude)}, " \
              f"{self.is_signed}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.operator)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.operator_name)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.face_id)}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.member_id)}, " \
              f"'{time_util.ts13_to_date_str(self.store_create_date_ts)}', " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.origin)}, " \
              f"{self.day_order_seq}, " \
              f"{self.discount_rate}, " \
              f"{self.discount_type}, " \
              f"{self.discount}, " \
              f"{self.money_before_whole_discount}, " \
              f"{self.receivable}, " \
              f"{self.erase}, " \
              f"{self.small_change}, " \
              f"{self.total_no_discount}, " \
              f"{self.payed_total}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.pay_type)}, " \
              f"{self.payment_channel}, " \
              f"{str_util.check_str_null_and_transform_to_sql_null(self.payment_scenarios)}, " \
              f"{self.product_count}, " \
              f"'{time_util.ts13_to_date_str(self.date_ts)}')"

        return sql

    @staticmethod
    def get_csv_header(sep=','):
        """
        生成 csv 数据的标头内容
        """
        header = f"order_id{sep}" \
                 f"store_id{sep}" \
                 f"store_name{sep}" \
                 f"store_status{sep}" \
                 f"store_own_user_id{sep}" \
                 f"store_own_user_name{sep}" \
                 f"store_own_user_tel{sep}" \
                 f"store_category{sep}" \
                 f"store_address{sep}" \
                 f"store_shop_no{sep}" \
                 f"store_province{sep}" \
                 f"store_city{sep}" \
                 f"store_district{sep}" \
                 f"store_gps_name{sep}" \
                 f"store_gps_address{sep}" \
                 f"store_gps_longitude{sep}" \
                 f"store_gps_latitude{sep}" \
                 f"is_signed{sep}" \
                 f"operator{sep}" \
                 f"operator_name{sep}" \
                 f"face_id{sep}" \
                 f"member_id{sep}" \
                 f"store_create_date_ts{sep}" \
                 f"origin{sep}" \
                 f"day_order_seq{sep}" \
                 f"discount_rate{sep}" \
                 f"discount_type{sep}" \
                 f"discount{sep}" \
                 f"money_before_whole_discount{sep}" \
                 f"receivable{sep}" \
                 f"erase{sep}" \
                 f"small_change{sep}" \
                 f"total_no_discount{sep}" \
                 f"pay_total{sep}" \
                 f"pay_type{sep}" \
                 f"payment_channel{sep}" \
                 f"payment_scenarios{sep}" \
                 f"product_count{sep}" \
                 f"date_ts\n"

        return header

    def check_and_transform_area(self):
        """
        检查省市区内容,为空就转换为未知
        """
        if str_util.check_null(self.store_province):
            self.store_province = '未知省份'
        if str_util.check_null(self.store_city):
            self.store_city = '未知城市'
        if str_util.check_null(self.store_district):
            self.store_district = '未知行政区'

    def check_and_transform_all_column(self):
        """
        转换全部的列,如果是空内容,就将其设置为空字符串
        """
        self.discount_rate = str_util.check_null_and_transform(self.discount_rate)
        self.store_shop_no = str_util.check_null_and_transform(self.store_shop_no)
        self.day_order_seq = str_util.check_null_and_transform(self.day_order_seq)
        self.store_district = str_util.check_null_and_transform(self.store_district)
        self.is_signed = str_util.check_null_and_transform(self.is_signed)
        self.store_province = str_util.check_null_and_transform(self.store_province)
        self.origin = str_util.check_null_and_transform(self.origin)
        self.store_gps_longitude = str_util.check_null_and_transform(self.store_gps_longitude)
        self.discount = str_util.check_null_and_transform(self.discount)
        self.store_id = str_util.check_null_and_transform(self.store_id)
        self.product_count = str_util.check_null_and_transform(self.product_count)
        self.operator_name = str_util.check_null_and_transform(self.operator_name)
        self.operator = str_util.check_null_and_transform(self.operator)
        self.store_status = str_util.check_null_and_transform(self.store_status)
        self.store_own_user_tel = str_util.check_null_and_transform(self.store_own_user_tel)
        self.pay_type = str_util.check_null_and_transform(self.pay_type)
        self.discount_type = str_util.check_null_and_transform(self.discount_type)
        self.store_name = str_util.check_null_and_transform(self.store_name)
        self.store_own_user_name = str_util.check_null_and_transform(self.store_own_user_name)
        self.date_ts = str_util.check_null_and_transform(self.date_ts)
        self.small_change = str_util.check_null_and_transform(self.small_change)
        self.store_gps_name = str_util.check_null_and_transform(self.store_gps_name)
        self.erase = str_util.check_null_and_transform(self.erase)
        self.store_gps_address = str_util.check_null_and_transform(self.store_gps_address)
        self.order_id = str_util.check_null_and_transform(self.order_id)
        self.money_before_whole_discount = str_util.check_null_and_transform(self.money_before_whole_discount)
        self.store_category = str_util.check_null_and_transform(self.store_category)
        self.receivable = str_util.check_null_and_transform(self.receivable)
        self.face_id = str_util.check_null_and_transform(self.face_id)
        self.store_own_user_id = str_util.check_null_and_transform(self.store_own_user_id)
        self.payment_channel = str_util.check_null_and_transform(self.payment_channel)
        self.payment_scenarios = str_util.check_null_and_transform(self.payment_scenarios)
        self.store_address = str_util.check_null_and_transform(self.store_address)
        self.total_no_discount = str_util.check_null_and_transform(self.total_no_discount)
        self.payed_total = str_util.check_null_and_transform(self.payed_total)
        self.store_gps_latitude = str_util.check_null_and_transform(self.store_gps_latitude)
        self.store_create_date_ts = str_util.check_null_and_transform(self.store_create_date_ts)
        self.store_city = str_util.check_null_and_transform(self.store_city)
        self.member_id = str_util.check_null_and_transform(self.member_id)

    def to_csv(self, sep=','):
        """
        生成 csv 数据,分割符默认为逗号。
        Note: 生成的数据顺序和header是一一对应的,不要混乱了。
        """
        self.check_and_transform_area()
        self.check_and_transform_all_column()

        csv_line = \
            f"{self.order_id}{sep}" \
            f"{self.store_id}{sep}" \
            f"{self.store_name}{sep}" \
            f"{self.store_status}{sep}" \
            f"{self.store_own_user_id}{sep}" \
            f"{self.store_own_user_name}{sep}" \
            f"{self.store_own_user_tel}{sep}" \
            f"{self.store_category}{sep}" \
            f"{self.store_address}{sep}" \
            f"{self.store_shop_no}{sep}" \
            f"{self.store_province}{sep}" \
            f"{self.store_city}{sep}" \
            f"{self.store_district}{sep}" \
            f"{self.store_gps_name}{sep}" \
            f"{self.store_gps_address}{sep}" \
            f"{self.store_gps_longitude}{sep}" \
            f"{self.store_gps_latitude}{sep}" \
            f"{self.is_signed}{sep}" \
            f"{self.operator}{sep}" \
            f"{self.operator_name}{sep}" \
            f"{self.face_id}{sep}" \
            f"{self.member_id}{sep}" \
            f"{time_util.ts13_to_date_str(self.store_create_date_ts)}{sep}" \
            f"{self.origin}{sep}" \
            f"{self.day_order_seq}{sep}" \
            f"{self.discount_rate}{sep}" \
            f"{self.discount_type}{sep}" \
            f"{self.discount}{sep}" \
            f"{self.money_before_whole_discount}{sep}" \
            f"{self.receivable}{sep}" \
            f"{self.erase}{sep}" \
            f"{self.small_change}{sep}" \
            f"{self.total_no_discount}{sep}" \
            f"{self.payed_total}{sep}" \
            f"{self.pay_type}{sep}" \
            f"{self.payment_channel}{sep}" \
            f"{self.payment_scenarios}{sep}" \
            f"{self.product_count}{sep}" \
            f"{time_util.ts13_to_date_str(self.date_ts)}\n"

        return csv_line


class SingleProductSoldModel(object):
    """订单售卖商品数据模型"""
    def __init__(self, order_id, product_detail):
        self.order_id = order_id
        self.count = product_detail['count']
        self.name = product_detail['name']
        self.unit_id = product_detail['unitID']
        self.barcode = product_detail['barcode']
        self.price_per = product_detail['pricePer']
        self.retail_price = product_detail['retailPrice']
        self.trade_price = product_detail['tradePrice']
        self.category_id = product_detail['categoryID']

    def generate_value_segment_for_sql_insert(self):
        """
        生成添加表数据SQL语句的VALUE语句段
        """
        segment = f"(" \
                  f"'{self.order_id}', " \
                  f"{str_util.check_str_null_and_transform_to_sql_null(self.barcode)}, " \
                  f"{str_util.check_str_null_and_transform_to_sql_null(self.name)}, " \
                  f"{self.count}, " \
                  f"{self.price_per}, " \
                  f"{self.retail_price}, " \
                  f"{self.trade_price}, " \
                  f"{self.category_id}, " \
                  f"{self.unit_id}" \
                  f")"

        return segment

    def to_csv(self, sep=","):
        """
        生成一条csv数据,分隔符默认逗号
        """
        csv_line = \
            f"{self.order_id}{sep}" \
            f"{self.barcode}{sep}" \
            f"{self.name}{sep}" \
            f"{self.count}{sep}" \
            f"{self.price_per}{sep}" \
            f"{self.retail_price}{sep}" \
            f"{self.trade_price}{sep}" \
            f"{self.category_id}{sep}" \
            f"{self.unit_id}\n"

        return csv_line


class OrdersDetailModel(object):
    """订单详情数据模型"""
    def __init__(self, data):
        """
        利用传入的订单json数据构建订单详情数据模型对象
        """
        data = json.loads(data)
        order_products_list = data['product']
        self.order_id = data['orderID']
        self.products_detail = [] # 记录当前订单卖出的商品

        for sing_product in order_products_list:
            product = SingleProductSoldModel(self.order_id, sing_product)
            self.products_detail.append(product)

    def generate_insert_sql(self):
        """
        生成添加表数据的SQL语句
        """
        sql = f"INSERT IGNORE INTO {conf.target_orders_detail_table_name}(" \
              f"order_id,barcode,name,count,price_per,retail_price,trade_price,category_id,unit_id" \
              f") VALUES"

        for single_product_sold_model in self.products_detail:
            sql += single_product_sold_model.generate_value_segment_for_sql_insert() + ", "

        # 去除最后的逗号
        sql = sql[:-2]
        return sql

    @staticmethod
    def get_csv_header(sep=','):
        """
        生成 csv 数据的标头内容
        """
        return f"order_id{sep}" \
               f"barcode{sep}" \
               f"name{sep}" \
               f"count{sep}" \
               f"price_per{sep}" \
               f"retail_price{sep}" \
               f"trade_price{sep}" \
               f"category_id{sep}" \
               f"unit_id\n"

    def to_csv(self):
        """生成添加csv的数据行"""
        csv_lines = ''

        for single_product_sold_model in self.products_detail:
            csv_lines += single_product_sold_model.to_csv()

        return csv_lines


class RetailOriginJsonModel(object):
    """
    原始订单JSON数据模型
    """
    def __init__(self, data):
        self.order_model = OrdersModel(data)
        self.order_detail_model = OrdersDetailModel(data)

    def get_order_model(self):
        return self.order_model

    def get_order_detail_model(self):
        return self.order_detail_model

    def get_order_id(self):
        return self.order_model.order_id

    def get_products_list(self):
        return self.order_detail_model.products_detail


09-订单JSON数据采集业务实现


9.1 获取未采集文件列表
  1. 获取订单文件夹下面有哪些订单JSON文件
  2. 查询元数据库表中已经被采集的订单JSON文件,来对比确定要采集新的订单JSON文件
  • 创建元数据连接
  • 检查数据表是否存在,不存在则创建
  • 获取元数据中已经被采集的Json文件路径列表
  • 对比确定要采集的新订单文件
  • 增加一个配置文件
# JSON订单数据源的文件路径
json_data_path = 'D:/etl/json/'


order_json_service.py

# 1. 获取订单文件夹下面有哪些订单JSON文件
all_json_files = file_util.get_dir_files_list(conf.json_data_path)

# 2. 查询元数据库表中已经被采集的订单JSON文件,来对比确定要采集新的订单JSON文件
# 2.1 创建元数据连接
metadata_util = mysql_util.get_mysql_util(
    host=conf.metadata_host,
    port=conf.metadata_port,
    user=conf.metadata_user,
    password=conf.metadata_password
)
# 2.2 检查数据表是否存在,不存在则创建
metadata_util.check_table_exists_and_create(
    db_name=conf.metadata_db,
    tb_name=conf.file_monitor_meta_table_name,
    tb_cols=conf.file_monitor_meta_table_create_cols
)
# 2.3 获取元数据中已经被采集的Json文件路径列表
sql = f'select * from {conf.file_monitor_meta_table_name};'
result = metadata_util.query(sql)
processed_json_files = [file_tuple[1] for file_tuple in result]
# 2.4 对比确定要采集的新订单文件
new_json_files = file_util.get_new_by_compare_lists(processed_json_files, all_json_files)


9.2 对新JSON文件进行数据采集
  1. 针对待采集的新订单JSON文件,进行数据采集(ETL操作->mysql->csv)
  • 创建csv文件,并打开文件(订单表+订单详情表)
  • 向CSV文件中写入标头信息(订单表+订单详情表)
  • 创建一个目标数据库连接对象
  • 检查数据表是否存在,不存在则创建
  • 遍历待处理的JSON文件

    • 按行读取JSON文件, 创建数据模型
    • 对模型进行处理,订单价格高于10000的记录直接剔除
    • 写入到目标表中
    • 写入到csv文件中


order_json_service.py

# 3. 针对待采集的新订单JSON文件,进行数据采集(ETL操作->mysql->csv)
# 3.1 创建csv文件,并打开文件(订单表+订单详情表)
# 订单
csv_order_file = open(conf.retail_output_csv_root_path + conf.retail_orders_output_csv_file_name, 'a', encoding='utf8')
# 订单详情
csv_order_detail_file = open(conf.retail_output_csv_root_path + conf.retail_orders_detail_output_csv_file_name, 'a', encoding='utf8')
# 3.2 向CSV文件中写入标头信息(订单表+订单详情表)
# 订单
csv_order_file.write(retail_orders_model.OrdersModel.get_csv_header())
# 订单详情
csv_order_detail_file.write(retail_orders_model.OrdersDetailModel.get_csv_header())
# 3.3 创建一个目标数据库连接对象
target_util = mysql_util.get_mysql_util(
    host=conf.target_host,
    port=conf.target_port,
    user=conf.target_user,
    password=conf.target_password
)
# 3.4 检查数据表是否存在,不存在则创建(订单表+订单详情表)
# 订单表
target_util.check_table_exists_and_create(
    db_name=conf.target_data_db,
    tb_name=conf.target_orders_table_name,
    tb_cols=conf.target_orders_table_create_cols
)
# 订单详情表
target_util.check_table_exists_and_create(
    db_name=conf.target_data_db,
    tb_name=conf.target_orders_detail_table_name,
    tb_cols=conf.target_orders_detail_table_create_cols
)
# 3.5 遍历待处理的JSON文件(new_json_files)
for json_file in new_json_files:
    #   3.5.1 按行读取JSON文件,
    for json_data in open(json_file, 'r', encoding='utf8'):
        # 创建数据模型(原始json数据模型)
        model = retail_orders_model.RetailOriginJsonModel(json_data)
        #   3.5.2 对模型进行处理,订单价格高于10000的记录直接剔除(模拟数据清洗过程,没有实际意义)
        if model.order_model.receivable <= 10000:
            #   3.5.3 写入到目标表中(订单表+订单详情表)
            # 创建sql语句
            order_sql = model.order_model.generate_insert_sql()
            order_detail_sql = model.order_detail_model.generate_insert_sql()
            # 执行插入指令
            target_util.insert_single_sql_without_commit(order_sql)
            target_util.insert_single_sql_without_commit(order_detail_sql)
            #   3.5.4 写入到csv文件中(订单文件+订单详情文件)
            # 创建CSV字符串
            order_csv_str = model.order_model.to_csv()
            order_detail_csv_str = model.order_detail_model.to_csv()
            # 写入CSV文件中
            csv_order_file.write(order_csv_str)
            csv_order_detail_file.write(order_detail_csv_str)
            
# 3.6 关闭数据库连接和文件
target_util.close()
metadata_util.close()
csv_order_file.close()
csv_order_detail_file.close()



9.3 添加事务

一个文件开始处理前开启事务,处理完成后提交事务

如果文件处理过程中出现异常则回滚事务


order_json_service.py

# 3. 针对待采集的新订单JSON文件,进行数据采集(ETL操作->mysql->csv)
# 3.1 创建csv文件,并打开文件(订单表+订单详情表)
# 订单
csv_order_file = open(conf.retail_output_csv_root_path + conf.retail_orders_output_csv_file_name, 'a', encoding='utf8')
# 订单详情
csv_order_detail_file = open(conf.retail_output_csv_root_path + conf.retail_orders_detail_output_csv_file_name, 'a', encoding='utf8')
# 3.2 向CSV文件中写入标头信息(订单表+订单详情表)
# 订单
csv_order_file.write(retail_orders_model.OrdersModel.get_csv_header())
# 订单详情
csv_order_detail_file.write(retail_orders_model.OrdersDetailModel.get_csv_header())
# 3.3 创建一个目标数据库连接对象
target_util = mysql_util.get_mysql_util(
    host=conf.target_host,
    port=conf.target_port,
    user=conf.target_user,
    password=conf.target_password
)
# 3.4 检查数据表是否存在,不存在则创建(订单表+订单详情表)
# 订单表
target_util.check_table_exists_and_create(
    db_name=conf.target_data_db,
    tb_name=conf.target_orders_table_name,
    tb_cols=conf.target_orders_table_create_cols
)
# 订单详情表
target_util.check_table_exists_and_create(
    db_name=conf.target_data_db,
    tb_name=conf.target_orders_detail_table_name,
    tb_cols=conf.target_orders_detail_table_create_cols
)
# 3.5 遍历待处理的JSON文件(new_json_files)
for json_file in new_json_files:
    # 事务1: 开启事务
    target_util.begin_transaction()
    
    # 事务2: 使用异常捕获如果插入mysql的指令出现异常子回滚事务
    try:
        #   3.5.1 按行读取JSON文件,
        for json_data in open(json_file, 'r', encoding='utf8'):
            # 创建数据模型(原始json数据模型)
            model = retail_orders_model.RetailOriginJsonModel(json_data)
            #   3.5.2 对模型进行处理,订单价格高于10000的记录直接剔除(模拟数据清洗过程,没有实际意义)
            if model.order_model.receivable <= 10000:
                #   3.5.3 写入到目标表中(订单表+订单详情表)
                # 创建sql语句
                order_sql = model.order_model.generate_insert_sql()
                order_detail_sql = model.order_detail_model.generate_insert_sql()
                # 执行插入指令
                target_util.insert_single_sql_without_commit(order_sql)
                target_util.insert_single_sql_without_commit(order_detail_sql)
                #   3.5.4 写入到csv文件中(订单文件+订单详情文件)
                # 创建CSV字符串
                order_csv_str = model.order_model.to_csv()
                order_detail_csv_str = model.order_detail_model.to_csv()
                # 写入CSV文件中
                csv_order_file.write(order_csv_str)
                csv_order_detail_file.write(order_detail_csv_str)
    except:
        # 事务2.1 : 出现异常事务回滚
        target_util.rollback_transaction()
    else:
        # 事务2.2 : 如果 没有出现异常处理一个文件结束需要提交日志
        target_util.commit_transaction()

# 3.6 关闭数据库连接和文件
target_util.close()
metadata_util.close()
csv_order_file.close()
csv_order_detail_file.close()



9.4 记录元数据
  1. 将本次采集的订单JSON文件,记录到元数据库的表中

注意: 记录元数据时一定要将数反斜杠进行替换


order_json_service.py

# 3.5 遍历待处理的JSON文件(new_json_files)
for json_file in new_json_files:
    # 4.2 创建计数器,记录处理数据的条目数
    data_count = 0
    # 事务1: 开启事务
    target_util.begin_transaction()

    # 事务2: 使用异常捕获如果插入mysql的指令出现异常子回滚事务
    try:
        #   3.5.1 按行读取JSON文件,
        for json_data in open(json_file, 'r', encoding='utf8'):
            # 创建数据模型(原始json数据模型)
            model = retail_orders_model.RetailOriginJsonModel(json_data)
            #   3.5.2 对模型进行处理,订单价格高于10000的记录直接剔除(模拟数据清洗过程,没有实际意义)
            if model.order_model.receivable <= 10000:
                # 4.3 提出数据后,将计数器自增
                data_count += 1
                #   3.5.3 写入到目标表中(订单表+订单详情表)
                # 创建sql语句
                order_sql = model.order_model.generate_insert_sql()
                order_detail_sql = model.order_detail_model.generate_insert_sql()
                # 执行插入指令
                target_util.insert_single_sql_without_commit(order_sql)
                target_util.insert_single_sql_without_commit(order_detail_sql)
                #   3.5.4 写入到csv文件中(订单文件+订单详情文件)
                # 创建CSV字符串
                order_csv_str = model.order_model.to_csv()
                order_detail_csv_str = model.order_detail_model.to_csv()
                # 写入CSV文件中
                csv_order_file.write(order_csv_str)
                csv_order_detail_file.write(order_detail_csv_str)
    except:
        # 事务2.1 : 出现异常事务回滚
        target_util.rollback_transaction()
    else:
        # 事务2.2 : 如果 没有出现异常处理一个文件结束需要提交日志
        target_util.commit_transaction()
        # 4. 将本次采集的订单JSON文件,记录到元数据库的表中
        # 4.1 将文件路径中的反斜杠全部替换为斜杠
        json_file = json_file.replace('\\', '/')
        # 4.4 构造一个sql语句
        sql = f'insert into {conf.file_monitor_meta_table_name}(file_name, process_lines)' \
              f'values("{json_file}", {data_count})'
        # 4.5 执行sql语句
        metadata_util.insert_single_sql(sql)


9.5 记录日志
  1. 脚本启动时记录日志
  2. 记录新采集的文件路径有哪些
  3. 一个文件采集完成记录日志
  4. 全部采集完成记录日志,并记录文件数和采集时间
# 0. 导入模块
import time

from util import file_util
from util import mysql_util
from util import logging_util
from model import retail_orders_model
from config import project_config as conf

# 日志1: 创建一个日志器对象
logger = logging_util.init_logger('order_json')
# 日志2: 记录脚本执行开始
logger.info('JSON订单数据采集开始...')

# 1. 获取订单文件夹下面有哪些订单JSON文件
all_json_files = file_util.get_dir_files_list(conf.json_data_path)

# 2. 查询元数据库表中已经被采集的订单JSON文件,来对比确定要采集新的订单JSON文件
# 2.1 创建元数据连接
metadata_util = mysql_util.get_mysql_util(
    host=conf.metadata_host,
    port=conf.metadata_port,
    user=conf.metadata_user,
    password=conf.metadata_password
)
# 2.2 检查数据表是否存在,不存在则创建
metadata_util.check_table_exists_and_create(
    db_name=conf.metadata_db,
    tb_name=conf.file_monitor_meta_table_name,
    tb_cols=conf.file_monitor_meta_table_create_cols
)
# 2.3 获取元数据中已经被采集的Json文件路径列表
sql = f'select * from {conf.file_monitor_meta_table_name};'
result = metadata_util.query(sql)
processed_json_files = [file_tuple[1] for file_tuple in result]
# 2.4 对比确定要采集的新订单文件
new_json_files = file_util.get_new_by_compare_lists(processed_json_files, all_json_files)

# 日志3: 记录新采集的日志文件路径有哪些
if new_json_files:
    logger.info(f'被采集的文件路径列表为{new_json_files}...')
else:
    logger.info(f'没有待采集的文件,退出程序...')
    exit('没有待采集文件,采集结束')

# 3. 针对待采集的新订单JSON文件,进行数据采集(ETL操作->mysql->csv)
# 3.1 创建csv文件,并打开文件(订单表+订单详情表)
# 订单
csv_order_file = open(conf.retail_output_csv_root_path + conf.retail_orders_output_csv_file_name, 'a', encoding='utf8')
# 订单详情
csv_order_detail_file = open(conf.retail_output_csv_root_path + conf.retail_orders_detail_output_csv_file_name, 'a',
                             encoding='utf8')
# 3.2 向CSV文件中写入标头信息(订单表+订单详情表)
# 订单
csv_order_file.write(retail_orders_model.OrdersModel.get_csv_header())
# 订单详情
csv_order_detail_file.write(retail_orders_model.OrdersDetailModel.get_csv_header())
# 3.3 创建一个目标数据库连接对象
target_util = mysql_util.get_mysql_util(
    host=conf.target_host,
    port=conf.target_port,
    user=conf.target_user,
    password=conf.target_password
)
# 3.4 检查数据表是否存在,不存在则创建(订单表+订单详情表)
# 订单表
target_util.check_table_exists_and_create(
    db_name=conf.target_data_db,
    tb_name=conf.target_orders_table_name,
    tb_cols=conf.target_orders_table_create_cols
)
# 订单详情表
target_util.check_table_exists_and_create(
    db_name=conf.target_data_db,
    tb_name=conf.target_orders_detail_table_name,
    tb_cols=conf.target_orders_detail_table_create_cols
)

# 日志5.1 :记录脚本开始时间
start = time.time()
# 3.5 遍历待处理的JSON文件(new_json_files)
for json_file in new_json_files:
    # 4.2 创建计数器,记录处理数据的条目数
    data_count = 0
    # 事务1: 开启事务
    target_util.begin_transaction()

    # 事务2: 使用异常捕获如果插入mysql的指令出现异常子回滚事务
    try:
        #   3.5.1 按行读取JSON文件,
        for json_data in open(json_file, 'r', encoding='utf8'):
            # 创建数据模型(原始json数据模型)
            model = retail_orders_model.RetailOriginJsonModel(json_data)
            #   3.5.2 对模型进行处理,订单价格高于10000的记录直接剔除(模拟数据清洗过程,没有实际意义)
            if model.order_model.receivable <= 10000:
                # 4.3 提出数据后,将计数器自增
                data_count += 1
                #   3.5.3 写入到目标表中(订单表+订单详情表)
                # 创建sql语句
                order_sql = model.order_model.generate_insert_sql()
                order_detail_sql = model.order_detail_model.generate_insert_sql()
                # 执行插入指令
                target_util.insert_single_sql_without_commit(order_sql)
                target_util.insert_single_sql_without_commit(order_detail_sql)
                #   3.5.4 写入到csv文件中(订单文件+订单详情文件)
                # 创建CSV字符串
                order_csv_str = model.order_model.to_csv()
                order_detail_csv_str = model.order_detail_model.to_csv()
                # 写入CSV文件中
                csv_order_file.write(order_csv_str)
                csv_order_detail_file.write(order_detail_csv_str)
    except:
        # 事务2.1 : 出现异常事务回滚
        target_util.rollback_transaction()
    else:
        # 事务2.2 : 如果 没有出现异常处理一个文件结束需要提交日志
        target_util.commit_transaction()
        # 4. 将本次采集的订单JSON文件,记录到元数据库的表中
        # 4.1 将文件路径中的反斜杠全部替换为斜杠
        json_file = json_file.replace('\\', '/')
        # 4.4 构造一个sql语句
        sql = f'insert into {conf.file_monitor_meta_table_name}(file_name, process_lines)' \
              f'values("{json_file}", {data_count})'
        # 4.5 执行sql语句
        metadata_util.insert_single_sql(sql)
        # 日志4: 一个文件采集完成记录日志
        logger.info(f'文件{json_file}采集完成,共采集{data_count}行数据...')
        logger.info(f'元数据以记录完成,记录文件路径为{json_file}')
# 日志5.2: 记录脚本执行结束时间
end = time.time()
# 日志5.3: 记录脚本采集的文件数量和采集时间
logger.info(f'本次脚本执行,共采集了{len(new_json_files)}个文件, 从耗时{end-start}s...')

# 3.6 关闭数据库连接和文件
target_util.close()
metadata_util.close()
csv_order_file.close()
csv_order_detail_file.close()



版权声明:本文为weixin_42421604原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。