问题解决:
- 增加目标数据库配置信息
# 结果写出MySQL的数据库相关配置
target_host = 'localhost'
target_port = 3306
target_user = 'root'
target_password = 'mysql'
target_data_db = "retail"
- csv文件目录拼接时,根路径没有末尾\
方式一: 在配置文件中,将csv文件的根目录末尾添加一个/
方式二: 在csv文件根目录和文件名拼接时使用os.path.join()连接
01-商品数据采集-业务需求说明(理解)
商品数据在mysql 数据库中, 这个数据源所在的数据库,是哪一个部门负责管理的??
前端开发: 负责用户交互
后端开发: 负责数据交互(
该数据源归后端开发管理和使用
)大数据开发: 负责海量数据的存储和计算(负责数据的ETL,保证数据满足数据分析的要求)
注意: 大数据开发人员,在后端数据库中通常只有数据库的查询权限,无法进行其他操作
大数据开发通常获取的是二手数据(去重,脱敏,整合后的数据…)
需求介绍:
1)将采集的商品数据保存到目标数据库中
- 在正常开发中,这个目标数据库其实是一个数仓,不会使用关系型数据库存储(RDBMS).
2)将采集的商品数据写出到 CSV 文件中
3)仅采集增量数据(
已经采集过的数据,不重复采集
)
实现思路:
① 查询元数据库表,获取上一次采集商品数据中 updateAt 的最大值
② 根据上一次采集商品数据中 updateAt 的最大值,查询数据源库商品表,获取继上一次采集之后,新增和更新的商品数据
③ 针对新增和更新的商品数据,进行数据采集(ETL->mysql->csv)
④ 将本次采集商品数据中的 updateAt 的最大值,保存到元数据库表中
02-测试数据添加 (
掌握
)
添加数据方式有两种:
使用终端添加
首先打开终端工具(例如: cmd)
在内部开启mysql客户端
mysql -uroot -p123456
在mysql客户端中输入
-- 创建数据源数据库 create database source_data charset='utf8'; -- 使用该数据库 use source_data; -- 执行脚本导入数据 source sql脚本的存放路径(可以从文件夹中将文件拖拽到cmd中获取路径)
使用第三方工具
打开Datagrip,并连接数据源所在的数据库
在数据库列表中,右键点击要导入数据的数据库, 选择 ‘Run sql Script’
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Y0nPrKf1-1688341417117)(day04-数据库数据采集&订单数据采集.assets/1672626735205.png)]
选择要执行的sql脚本点击确定即可
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PBHxCEKI-1688341417118)(day04-数据库数据采集&订单数据采集.assets/1672626794057.png)]
03-增量数据采集(理解)
全量采集: 指定数据源中的所有数据全部采集
增量采集: 指定数据源中的新增数据,和修改数据的采集
问题: 怎样判断增量采集??
创建或更新时间,大于上次采集的最大时间,即可证明是增量数据.
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JBawPQ33-1688341417119)(day04-数据库数据采集&订单数据采集.assets/4.png)]
- defalut current_timestamp: 插入数据时,默认值为当前时间
- on update current_timestamp: 更新数据时将其更改为当前时间
问题2: 怎样实现增量采集??
- 查询(采集)数据的时候,SELECT SQL语句按照
updateAt
进行排序,按照
升序
排序
排序原因:
- 进行where 中的< > <= >=操作时,排序后的数据筛选效率更高
- 采集后 写入csv和目标数据库的时候也是按照时间顺序进行排序的
当采集完成后,将当前批次最大的时间记录在:MySQL的
元数据库中
下一次采集的时候,从MySQL的元数据库中,查询出来
上一次采集的时间
SQL的SELECT语句的WHERE条件设置为:
updateAt > 上一次采集时间
即可
04-配置文件导入
config/project_config.py
# ################## --数据库barcode商品数据采集配置项-- ###################
# 元数据库配置
# barcode业务:update_at字段的监控表的名称
metadata_barcode_table_name = 'barcode_monitor'
# barcode业务:update_at字段的监控表的建表语句的列信息
metadata_barcode_table_create_cols = "id INT PRIMARY KEY AUTO_INCREMENT COMMENT '自增ID', " \
"time_record TIMESTAMP NOT NULL COMMENT '本次采集记录的最大时间', " \
"gather_line_count INT NULL COMMENT '本次采集条数'"
# 数据源库配置
source_host = 'localhost'
source_user = 'root'
source_password = '123456'
source_port = 3306
source_data_db = 'source_data'
# 数据源表名称
source_barcode_table_name = 'sys_barcode'
# 目标数据库配置
# 条码商品表名称
target_barcode_table_name = 'sys_barcode'
target_barcode_table_create_cols = """
`code` varchar(50) PRIMARY KEY COMMENT '商品条码',
`name` varchar(200) DEFAULT '' COMMENT '商品名称',
`spec` varchar(200) DEFAULT '' COMMENT '商品规格',
`trademark` varchar(100) DEFAULT '' COMMENT '商品商标',
`addr` varchar(200) DEFAULT '' COMMENT '商品产地',
`units` varchar(50) DEFAULT '' COMMENT '商品单位(个、杯、箱、等)',
`factory_name` varchar(200) DEFAULT '' COMMENT '生产厂家',
`trade_price` DECIMAL(50, 5) DEFAULT 0.0 COMMENT '贸易价格(指导进价)',
`retail_price` DECIMAL(50, 5) DEFAULT 0.0 COMMENT '零售价格(建议卖价)',
`update_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`wholeunit` varchar(50) DEFAULT NULL COMMENT '大包装单位',
`wholenum` int(11) DEFAULT NULL COMMENT '大包装内装数量',
`img` varchar(500) DEFAULT NULL COMMENT '商品图片',
`src` varchar(20) DEFAULT NULL COMMENT '源信息',
INDEX (update_at)
"""
# 商品数据写出 csv 根路径
barcode_output_csv_root_path = "D:/etl/output/csv/"
# 商品数据写出 csv 文件名
barcode_orders_output_csv_file_name = f'barcode-{time.strftime("%Y-%m-%d-%H_%M", time.localtime())}.csv'
05-构建商品数据模型类(理解)
model/barcode_model.py
"""
条码商品信息模型类
"""
from util import str_util
from config import project_config as conf
class BarcodeModel(object):
"""条码商品信息模型类"""
def __init__(self, code=None, name=None, spec=None, trademark=None,
addr=None, units=None, factory_name=None, trade_price=None,
retail_price=None, update_at=None, wholeunit=None,
wholenum=None, img=None, src=None):
"""条码商品模型对象初始化"""
self.code = code
self.name = str_util.clear_str(name)
self.spec = str_util.clear_str(spec)
self.trademark = str_util.clear_str(trademark)
self.addr = str_util.clear_str(addr)
self.units = str_util.clear_str(units)
self.factory_name = str_util.clear_str(factory_name)
self.trade_price = trade_price
self.retail_price = retail_price
self.update_at = update_at
self.wholeunit = str_util.clear_str(wholeunit)
self.wholenum = wholenum
self.img = img
self.src = src
def generate_insert_sql(self):
"""生成SQL的插入语句"""
sql = f"REPLACE INTO {conf.target_barcode_table_name}(" \
f"code,name,spec,trademark,addr,units,factory_name,trade_price," \
f"retail_price,update_at,wholeunit,wholenum,img,src) VALUES(" \
f"'{self.code}', " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.name)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.spec)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.trademark)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.addr)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.units)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.factory_name)}, " \
f"{str_util.check_number_null_and_transform_to_sql_null(self.trade_price)}, " \
f"{str_util.check_number_null_and_transform_to_sql_null(self.retail_price)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.update_at)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.wholeunit)}, " \
f"{str_util.check_number_null_and_transform_to_sql_null(self.wholenum)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.img)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.src)}" \
f")"
return sql
@staticmethod
def get_csv_header(sep=','):
"""
生成 csv 数据的标头内容
"""
return f"code{sep}" \
f"name{sep}" \
f"spec{sep}" \
f"trademark{sep}" \
f"addr{sep}" \
f"units{sep}" \
f"factory_name{sep}" \
f"trade_price{sep}" \
f"retail_price{sep}" \
f"update_at{sep}" \
f"wholeunit{sep}" \
f"wholenum{sep}" \
f"img{sep}" \
f"src\n"
def to_csv(self, sep=','):
"""生成csv数据行"""
csv_line = \
f"{self.code}{sep}" \
f"{self.name}{sep}" \
f"{self.spec}{sep}" \
f"{self.trademark}{sep}" \
f"{self.addr}{sep}" \
f"{self.units}{sep}" \
f"{self.factory_name}{sep}" \
f"{self.trade_price}{sep}" \
f"{self.retail_price}{sep}" \
f"{self.update_at}{sep}" \
f"{self.wholeunit}{sep}" \
f"{self.wholenum}{sep}" \
f"{self.img}{sep}" \
f"{self.src}\n"
return csv_line
06-商品数据采集核心业务实现(
理解,并跟着敲一遍
)
6.1 查询上次采集的最大时间
- 查询元数据库表,获取上批次条码商品数据处理的最大时间
- 创建元数据库连接对象
- 检查元数据表是否存在,不存在则创建
- 查询商品采集元数据表中,上一次采集记录的 updateAt 的最大值
- 判断获取到的数据集是否有值,创建变量保存该值
- 如果有值则保存
- 如果没有值则保存None
barcode_service.py
# 0. 导包
from util import mysql_util
from config import project_config as conf
# 1. 查询元数据库表,获取上批次条码商品数据处理的最大时间
#
# 1.1 创建元数据库连接对象
metadata_util = mysql_util.get_mysql_util(
host=conf.metadata_host,
port=conf.metadata_port,
user=conf.metadata_user,
password=conf.metadata_password
)
# 1.2 检查元数据表是否存在,不存在则创建
metadata_util.check_table_exists_and_create(
db_name=conf.metadata_db,
tb_name=conf.metadata_barcode_table_name,
tb_cols=conf.metadata_barcode_table_create_cols
)
# 1.3 查询商品采集元数据表中,上一次采集记录的 updateAt 的最大值
# 1.3.1 创建查询sql
sql = f'select max(updateAt) from {conf.metadata_barcode_table_name};'
# 1.3.2 执行sql
result = metadata_util.query(sql) # ((最大时间,),)
# 1.4 判断获取到的数据集是否有值,创建变量保存该值
if result[0][0]:
# 1.4.1 如果有值则保存
barcode_max_time = result[0][0]
else:
# 1.4.2 如果没有值则保存None
barcode_max_time = None
6.2 根据最大采集时间,从数据源中采集数据
- 根据上一次采集商品数据中 updateAt 的最大值,查询数据源库商品表,获取继上一次采集之后,新增和更新的商品数据
- 创建数据源数据库连接对象
- 判断数据源数据库是否存在
- 如果不存在则退出程序
- 查询元数据库表,获取增量更新的条码商品数据
- 没有最大采集时间(初次采集,全量采集)
- 有最大采集时间(再次采集,增量采集)
- 判断采集数据条目数
- 如果条目数为0则退出去程序
barcode_service.py
# 2. 根据上一次采集商品数据中 updateAt 的最大值,查询数据源库商品表,获取继上一次采集之后,新增和更新的商品数据
# 2.1 创建数据源数据库连接对象
source_util = mysql_util.get_mysql_util(
host=conf.source_host,
port=conf.source_port,
user=conf.source_user,
password=conf.source_password
)
# 2.2 判断数据源数据库是否存在, 如果不存在则退出程序
if not source_util.check_table_exists(conf.source_data_db, conf.source_barcode_table_name):
exit('数据源不存在,请与后端开发人员协商(sibi)解决')
# 2.3 查询元数据库表,获取增量更新的条码商品数据
if not barcode_max_time:
# 2.3.1 没有最大采集时间(初次采集,全量采集 没有where)
sql = f'select * from {conf.source_barcode_table_name} order by updateAt;'
else:
# 2.3.2 有最大采集时间(再次采集,增量采集 有where)
sql = f'select * from {conf.source_barcode_table_name} ' \
f'where updateAt > "{barcode_max_time}" ' \
f'order by updateAt;'
# 执行sql语句
result = source_util.query(sql)
# 2.4 判断采集数据条目数, 如果条目数为0则退出程序
if not len(result):
exit('没有待采集数据...')
6.3 将条码数据写入mysql和csv文件中
- 针对新增和更新的商品数据,进行数据采集(ETL->mysql->csv)
- 检查目标数据库表是否存在,如果不存在则创建
- 创建目标csv文件
- 写入csv的标头
- 创建目标数据库连接对象
- 根据数据源采集结果创建数据模型
- 写入mysql数据库
- 写入csv文件
- 关闭数据库连接
为了创建模型更加简便我们修改了模型类的基础逻辑
model/barcode_model.py
"""条码商品信息模型类"""
def __init__(self, data_tuple: tuple):
"""条码商品模型对象初始化"""
self.code = data_tuple[0]
self.name = str_util.clear_str(data_tuple[1])
self.spec = str_util.clear_str(data_tuple[2])
self.trademark = str_util.clear_str(data_tuple[3])
self.addr = str_util.clear_str(data_tuple[4])
self.units = str_util.clear_str(data_tuple[5])
self.factory_name = str_util.clear_str(data_tuple[6])
self.trade_price = data_tuple[7]
self.retail_price = data_tuple[8]
self.update_at = str(data_tuple[9])
self.wholeunit = data_tuple[10]
self.wholenum = data_tuple[11]
self.img = data_tuple[12]
self.src = data_tuple[13]
barcode_service.py
# 3. 针对新增和更新的商品数据,进行数据采集(ETL->mysql->csv)
# 3.1 创建目标数据库连接对象
target_util = mysql_util.get_mysql_util(
host=conf.target_host,
port=conf.target_port,
user=conf.target_user,
password=conf.target_password
)
# 3.2 检查目标数据库表是否存在,如果不存在则创建
target_util.check_table_exists_and_create(
db_name=conf.target_data_db,
tb_name=conf.target_barcode_table_name,
tb_cols=conf.target_barcode_table_create_cols
)
# 3.3 创建目标csv文件
csv_file = open(
file=conf.barcode_output_csv_root_path + conf.barcode_orders_output_csv_file_name,
mode='a',
encoding='utf8')
# 3.4 写入csv的标头
csv_file.write(BarcodeModel.get_csv_header())
# 3.5 根据数据源采集结果创建数据模型
for row_data in result:
model = BarcodeModel(row_data)
# 3.5.1 写入mysql数据库
target_util.insert_single_sql(model.generate_insert_sql())
# 3.5.2 写入csv文件
csv_file.write(model.to_csv())
# 3.6 关闭数据库连接
csv_file.close()
target_util.close()
source_util.close()
6.4 添加事务(1000条提交一次)
记录处理条目数,如果处理条目达到1000的倍数
- 提交mysql数据
- 记录元数据
- 关闭csv文件
barcode_service.py
# 3. 针对新增和更新的商品数据,进行数据采集(ETL->mysql->csv)
# 3.1 创建目标数据库连接对象
target_util = mysql_util.get_mysql_util(
host=conf.target_host,
port=conf.target_port,
user=conf.target_user,
password=conf.target_password
)
# 3.2 检查目标数据库表是否存在,如果不存在则创建
target_util.check_table_exists_and_create(
db_name=conf.target_data_db,
tb_name=conf.target_barcode_table_name,
tb_cols=conf.target_barcode_table_create_cols
)
# 3.3 创建目标csv文件
csv_file = open(
file=conf.barcode_output_csv_root_path + conf.barcode_orders_output_csv_file_name,
mode='a',
encoding='utf8')
# 3.4 写入csv的标头
csv_file.write(BarcodeModel.get_csv_header())
# 事务1: 创建一个变量,保存处理数据的条目数
data_count = 0
# 事务2: 首次开启事务
target_util.begin_transaction()
# 3.5 根据数据源采集结果创建数据模型
for row_data in result:
# 事务4: 计数器自增
data_count += 1
try:
# 创建一个模型
model = BarcodeModel(row_data)
# 3.5.1 写入mysql数据库
target_util.insert_single_sql_without_commit(model.generate_insert_sql())
# 3.5.2 写入csv文件
csv_file.write(model.to_csv())
except Exception as e:
# 回滚事务
target_util.rollback_transaction()
# 结束程序 (如果想要在回滚后恢复各种变量的值,做的操作太过繁琐不如将脚本直接杀死后重新执行)
exit('插入数据库失败结束程序')
else:
# 事务3: 判断数据条目是否为1000的倍数
if data_count % 1000 == 0:
# 提交事务
target_util.commit_transaction()
# 开启下一次事务
target_util.begin_transaction()
# 事务5: 关闭csv文件,并创建下一个csv文件
csv_file.close()
# 开启下一个文件
csv_file = open(
file=conf.barcode_output_csv_root_path + conf.barcode_orders_output_csv_file_name,
mode='a',
encoding='utf8')
# 写入csv的标头
csv_file.write(BarcodeModel.get_csv_header())
else:
# 事务6: 数据执行结束,将没有提交的sql语句提交和csv文件关闭
# 提交事务
target_util.commit_transaction()
# 关闭csv文件
csv_file.close()
# 3.6 关闭数据库连接
target_util.close()
source_util.close()
6.5 事务提交后将最大时间保存到元数据库中
- 将本次采集商品数据中的 updateAt 的最大值,保存到元数据库表中
- 准备sql语句
- 执行sql语句
barcode_service.py
# 3. 针对新增和更新的商品数据,进行数据采集(ETL->mysql->csv)
# 3.1 创建目标数据库连接对象
target_util = mysql_util.get_mysql_util(
host=conf.target_host,
port=conf.target_port,
user=conf.target_user,
password=conf.target_password
)
# 3.2 检查目标数据库表是否存在,如果不存在则创建
target_util.check_table_exists_and_create(
db_name=conf.target_data_db,
tb_name=conf.target_barcode_table_name,
tb_cols=conf.target_barcode_table_create_cols
)
# 3.3 创建目标csv文件
csv_file = open(
file=conf.barcode_output_csv_root_path + conf.barcode_orders_output_csv_file_name,
mode='a',
encoding='utf8')
# 3.4 写入csv的标头
csv_file.write(BarcodeModel.get_csv_header())
# 事务1: 创建一个变量,保存处理数据的条目数
data_count = 0
# 事务2: 首次开启事务
target_util.begin_transaction()
# 3.5 根据数据源采集结果创建数据模型
for row_data in result:
# 事务4: 计数器自增
data_count += 1
try:
# 创建一个模型
model = BarcodeModel(row_data)
# 3.5.1 写入mysql数据库
target_util.insert_single_sql_without_commit(model.generate_insert_sql())
# 3.5.2 写入csv文件
csv_file.write(model.to_csv())
except Exception as e:
# 回滚事务
target_util.rollback_transaction()
# 结束程序 (如果想要在回滚后恢复各种变量的值,做的操作太过繁琐不如将脚本直接杀死后重新执行)
exit('插入数据库失败结束程序')
else:
# 事务3: 判断数据条目是否为1000的倍数
if data_count % 1000 == 0:
# 提交事务
target_util.commit_transaction()
# 4.将本次采集商品数据中的updateAt的最大值,保存到元数据库表中
# 4.1 准备sql语句
sql = f'insert into {conf.metadata_barcode_table_name}(time_record, gather_line_count)' \
f'values ("{model.update_at}", 1000)'
# 4.2 执行sql语句
metadata_util.insert_single_sql(sql)
# 开启下一次事务
target_util.begin_transaction()
# 事务5: 关闭csv文件,并创建下一个csv文件
csv_file.close()
# 开启下一个文件
csv_file = open(
file=conf.barcode_output_csv_root_path + conf.barcode_orders_output_csv_file_name,
mode='a',
encoding='utf8')
# 写入csv的标头
csv_file.write(BarcodeModel.get_csv_header())
else:
# 事务6: 数据执行结束,将没有提交的sql语句提交和csv文件关闭
# 提交事务
target_util.commit_transaction()
# 4.将本次采集商品数据中的updateAt的最大值,保存到元数据库表中
# 4.1 准备sql语句
sql = f'insert into {conf.metadata_barcode_table_name}(time_record, gather_line_count)' \
f'values ("{model.update_at}", {data_count % 1000})'
# 4.2 执行sql语句
metadata_util.insert_single_sql(sql)
# 关闭csv文件
csv_file.close()
# 3.6 关闭数据库连接
target_util.close()
source_util.close()
metadata_util.close()
6.6 记录日志
- 脚本启动时记录日志
- 源数据中数据表村存在时记录日志
- 采集数据为空时记录日志
- 每次commit和关闭csv文件前记录日志
- 记录元数据后记录日志
- 脚本执行结束后记录日志
barcode_service.py
# 0. 导包
import time
from util import mysql_util
from util import logging_util
from config import project_config as conf
from model.barcode_model import BarcodeModel
# 日志1:创建日志器对象
logger = logging_util.init_logger('barcode')
# 日志2: 记录脚本的启动
logger.info('商品数据采集开始...')
# 1. 查询元数据库表,获取上批次条码商品数据处理的最大时间
#
# 1.1 创建元数据库连接对象
metadata_util = mysql_util.get_mysql_util(
host=conf.metadata_host,
port=conf.metadata_port,
user=conf.metadata_user,
password=conf.metadata_password
)
# 1.2 检查元数据表是否存在,不存在则创建
metadata_util.check_table_exists_and_create(
db_name=conf.metadata_db,
tb_name=conf.metadata_barcode_table_name,
tb_cols=conf.metadata_barcode_table_create_cols
)
# 1.3 查询商品采集元数据表中,上一次采集记录的 updateAt 的最大值
# 1.3.1 创建查询sql
sql = f'select max(time_record) from {conf.metadata_barcode_table_name};'
# 1.3.2 执行sql
result = metadata_util.query(sql) # ((最大时间,),)
# 1.4 判断获取到的数据集是否有值,创建变量保存该值
if result[0][0]:
# 1.4.1 如果有值则保存
barcode_max_time = result[0][0]
else:
# 1.4.2 如果没有值则保存None
barcode_max_time = None
# 2. 根据上一次采集商品数据中 updateAt 的最大值,查询数据源库商品表,获取继上一次采集之后,新增和更新的商品数据
# 2.1 创建数据源数据库连接对象
source_util = mysql_util.get_mysql_util(
host=conf.source_host,
port=conf.source_port,
user=conf.source_user,
password=conf.source_password
)
# 2.2 判断数据源数据库是否存在, 如果不存在则退出程序
if not source_util.check_table_exists(conf.source_data_db, conf.source_barcode_table_name):
# 日志3: 记录数据源数据表不存在时的警告
logger.error('数据源数据库中的表不存在,退出程序...')
exit('数据源不存在,请与后端开发人员协商(sibi)解决')
# 2.3 查询元数据库表,获取增量更新的条码商品数据
if not barcode_max_time:
# 2.3.1 没有最大采集时间(初次采集,全量采集 没有where)
sql = f'select * from {conf.source_barcode_table_name} order by updateAt;'
else:
# 2.3.2 有最大采集时间(再次采集,增量采集 有where)
sql = f'select * from {conf.source_barcode_table_name} ' \
f'where updateAt > "{barcode_max_time}" ' \
f'order by updateAt;'
# 执行sql语句
result = source_util.query(sql)
# 2.4 判断采集数据条目数, 如果条目数为0则退出程序
if not len(result):
# 日志4: 采集数据记录为空时记录日志
logger.info('没有待采集数据,采集结束...')
exit('没有待采集数据...')
# 3. 针对新增和更新的商品数据,进行数据采集(ETL->mysql->csv)
# 3.1 创建目标数据库连接对象
target_util = mysql_util.get_mysql_util(
host=conf.target_host,
port=conf.target_port,
user=conf.target_user,
password=conf.target_password
)
# 3.2 检查目标数据库表是否存在,如果不存在则创建
target_util.check_table_exists_and_create(
db_name=conf.target_data_db,
tb_name=conf.target_barcode_table_name,
tb_cols=conf.target_barcode_table_create_cols
)
# 3.3 创建目标csv文件
csv_file = open(
file=conf.barcode_output_csv_root_path + conf.barcode_orders_output_csv_file_name,
mode='a',
encoding='utf8')
# 3.4 写入csv的标头
csv_file.write(BarcodeModel.get_csv_header())
# 事务1: 创建一个变量,保存处理数据的条目数
data_count = 0
# 事务2: 首次开启事务
target_util.begin_transaction()
# 日志8: 记录脚本执行耗时: 开始时间
start = time.time()
# 3.5 根据数据源采集结果创建数据模型
for row_data in result:
# 事务4: 计数器自增
data_count += 1
try:
# 创建一个模型
model = BarcodeModel(row_data)
# 3.5.1 写入mysql数据库
target_util.insert_single_sql_without_commit(model.generate_insert_sql())
# 3.5.2 写入csv文件
csv_file.write(model.to_csv())
except Exception as e:
# 回滚事务
target_util.rollback_transaction()
# 结束程序 (如果想要在回滚后恢复各种变量的值,做的操作太过繁琐不如将脚本直接杀死后重新执行)
exit('插入数据库失败结束程序')
else:
# 事务3: 判断数据条目是否为1000的倍数
if data_count % 1000 == 0:
# 提交事务
target_util.commit_transaction()
# 4.将本次采集商品数据中的updateAt的最大值,保存到元数据库表中
# 4.1 准备sql语句
sql = f'insert into {conf.metadata_barcode_table_name}(time_record, gather_line_count)' \
f'values ("{model.update_at}", 1000)'
# 4.2 执行sql语句
metadata_util.insert_single_sql(sql)
# # 日志7: 提交事务后记录元数据,记录日志
logger.info(f'元数据已经提交... 最大记录时间为{model.update_at}, 记录数据条目数为1000')
# 日志5: 记录向数据库中commit数据的日志
logger.info(f'本次采集数据条目数为1000,已采集完成并写入数据库,共采集{data_count}')
# 开启下一次事务
target_util.begin_transaction()
# 事务5: 关闭csv文件,并创建下一个csv文件
csv_file.close()
# 日志6: 记录关闭csv文件的日志
logger.info(f'本次采集数据条目数为1000,已采集完成并写入{csv_file.name}文件中,共采集{data_count}')
# 开启下一个文件
csv_file = open(
file=conf.barcode_output_csv_root_path + conf.barcode_orders_output_csv_file_name,
mode='a',
encoding='utf8')
# 写入csv的标头
csv_file.write(BarcodeModel.get_csv_header())
else:
# 事务6: 数据执行结束,将没有提交的sql语句提交和csv文件关闭
# 提交事务
target_util.commit_transaction()
# 日志5: 记录向数据库中commit数据的日志
logger.info(f'本次采集数据条目数为{data_count % 1000},已采集完成并写入数据库,共采集{data_count}')
# 4.将本次采集商品数据中的updateAt的最大值,保存到元数据库表中
# 4.1 准备sql语句
sql = f'insert into {conf.metadata_barcode_table_name}(time_record, gather_line_count)' \
f'values ("{model.update_at}", {data_count % 1000})'
# 4.2 执行sql语句
metadata_util.insert_single_sql(sql)
# 日志7: 提交事务后记录元数据,记录日志
logger.info(f'元数据已经提交... 最大记录时间为{model.update_at}, 记录数据条目数为{data_count % 1000}')
# 关闭csv文件
csv_file.close()
# 日志6: 记录关闭csv文件的日志
logger.info(f'本次采集数据条目数为{data_count % 1000},已采集完成并写入{csv_file.name}文件中,共采集{data_count}')
# 日志8: 记录脚本执行耗时: 结束时间
end = time.time()
# 日志8: 记录脚本执行耗时:输出日志
logger.info(f'商品数据采集执行结束,共采集数据{data_count}条, 共耗时{end-start}s...')
# 3.6 关闭数据库连接
target_util.close()
source_util.close()
metadata_util.close()
07-订单数据采集-业务需求说明(理解)
业务需求:
1)将采集的订单JSON数据保存到目标数据库中
2)将采集的订单JSON写出到 CSV 文件中
3)采集数据时JSON文件不能重复采集
实现思路:
- 获取订单文件夹下面有哪些订单JSON文件
- 查询元数据库表中已经被采集的订单JSON文件,来对比确定要采集新的订单JSON文件
- 针对待采集的新订单JSON文件,进行数据采集(ETL操作->mysql->csv)
- 将本次采集的订单JSON文件,记录到元数据库的表中
08-构建订单数据模型模型类(
重点理解
)
8.1 JSON数据解析(
要记下来
)
- json模块的使用
json.loads 将JSON字符串数据转换为Python中的数据类型
json.dumps 将python中的数据类型转换为JSON字符串
import json
json_data_str1 = '{"discountRate": 1, "storeShopNo": "None", "dayOrderSeq": 19, "storeDistrict": "龙华区", "isSigned": 0, "storeProvince": "广东省", "origin": 0}'
json_data_str2 = '{"discountRate": 1, "storeShopNo": "None", "dayOrderSeq": 19, "storeDistrict": "龙华区", "isSigned": 0, "storeProvince": "广东省", "origin": 0, "products":[{"name": "小浣熊洗洁精", "count":1},{"name": "劲霸童装", "count":2}]}'
"""
在JSON数据中只有 object(对象类型), array(数组类型), number(数值类型), string(字符串类型), boolean(布尔类型)
"""
# 1. 将Json字符串转换为python中的数据类型
python_data1 = json.loads(json_data_str1)
print(python_data1)
print(type(python_data1))
python_data2 = json.loads(json_data_str2)
print(python_data2)
print(type(python_data2))
print(type(python_data2['products']))
# 2.将python中的数据类型转换为JSON字符串
json_str1 = json.dumps(python_data1, ensure_ascii=False, indent=4)
print(json_str1)
print(type(json_str1))
- JSON数据格式化
格式化JSON数据的网站:
在线JSON校验格式化工具(Be JSON)
{
"discountRate": 1,
"storeShopNo": "None",
"dayOrderSeq": 14,
"storeDistrict": "澧县",
"isSigned": 0,
"storeProvince": "湖南省",
"origin": 0,
"storeGPSLongitude": "111.76220097590462",
"discount": 0,
"storeID": 1837,
"productCount": 3,
"operatorName": "OperatorName",
"operator": "NameStr",
"storeStatus": "open",
"storeOwnUserTel": 12345678910,
"payType": "cash",
"discountType": 2,
"storeName": "九澧超市",
"storeOwnUserName": "OwnUserNameStr",
"dateTS": 1542436530000,
"smallChange": 0,
"storeGPSName": "None",
"erase": 0,
"product": [{
"count": 1,
"name": "好丽友呀土豆番茄酱味40g",
"unitID": 7,
"barcode": "6920907808513",
"pricePer": 3.5,
"retailPrice": 3.5,
"tradePrice": 2.85,
"categoryID": 10
}, {
"count": 1,
"name": "卫龙素食经典大面筋102g",
"unitID": 7,
"barcode": "6935284499995",
"pricePer": 3,
"retailPrice": 3,
"tradePrice": 2,
"categoryID": 10
}],
"storeGPSAddress": "None",
"orderID": "154243653020318374152",
"moneyBeforeWholeDiscount": 10.5,
"storeCategory": "normal",
"receivable": 10.5,
"faceID": "",
"storeOwnUserId": 1770,
"paymentChannel": 0,
"paymentScenarios": "OTHER",
"storeAddress": "StoreAddress",
"totalNoDiscount": 10.5,
"payedTotal": 10.5,
"storeGPSLatitude": "29.649557079006957",
"storeCreateDateTS": 1540883593000,
"storeCity": "常德市",
"memberID": "0"
}
经过格式化,我们可以清晰的看到,上述json数据转换为python数据类型时结果为:
字典 嵌套 列表 嵌套 字典的形式
{
key1:value1,
key2:[
{in_key1:in_value1},
{in_key2:in_value2}
]
}
- 订单数数据解析
问题1: 我们如何将上述json数据转换为msyql中的数据表???
进行数据的拆分,将上述数据分别放置到不同的数据表中,并且进行外键关联
问题2: 上述JSON数据转换后将转换为几张表, 他们的关系是怎样的???
转换为两张表即可 订单表 订单详情表
外键关联, 在订单详情表中创建外键,绑定订单表的主键即可
问题3: 我们如何将上述JSON数据转换为模型???
我们将其转换为两个模型,每个模型中的数据和数据表拆分的逻辑相似
问题4: 上述JSON数据转换为模型后有几个模型类, 他们的关系是怎样的???
拆分后可以有两个模型, 订单模型 订单详情模型
订单模型对象的属性中保存的值就是订单详情模型,一个订单模型可以保存对个订单详情模型
class OrderModel(object): def __init__(self,order_id, project_list ): self.order_id = order_id self.projects = project_list # 此列表中保存的是多个商品数据模型
表1:订单表
id | shop_time | total_price |
---|---|---|
001 | 2022-1-12 | 18.5 |
002 | 2022-1-15 | 203.4 |
003 | 2022-1-18 | 19.9 |
表2:订单详情表
id | product_name | order_id(外键) |
---|---|---|
1 | 小浣熊洗衣液 | 001 |
2 | 雕牌手机 | 001 |
3 | 拉芳拖拉机 | 002 |
8.2 订单JSON数据模型分类
订单业务数据模型:
1)订单数据模型:OrdersModel : 存储的是json数据中的订单数据
2)单个售卖商品数据模型:SingleProductSoldModel : 存储的是订单中的单个商品
3)订单详情数据模型:OrdersDetailModel : 存储的是订单中的全部商品(本质就是讲多个SingleProductSoldModel嵌套起来)
4)原始数据模型(订单数据+订单详情数据组合模型):RetailOriginJsonModel : 本质就是订单+商品详情
原始数据模型
订单模型
订单详情数据模型
单个售卖商品模型1
单个售卖商品模型2
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xR0aRrA4-1688341417120)(day04-数据库数据采集&订单数据采集.assets/1672652136253.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-y0XX0WEF-1688341417121)(day04-数据库数据采集&订单数据采集.assets/1663230173864.png)]
8.3 添加配置文件信息
config/project_config.py
# ################## --订单JSON数据采集配置项-- ###################
# 订单表名称
target_orders_table_name = "orders"
# 订单表MySQL数据库建表语句列信息
target_orders_table_create_cols = \
f"order_id VARCHAR(255) PRIMARY KEY, " \
f"store_id INT COMMENT '店铺ID', " \
f"store_name VARCHAR(30) COMMENT '店铺名称', " \
f"store_status VARCHAR(10) COMMENT '店铺状态(open,close)', " \
f"store_own_user_id INT COMMENT '店主id', " \
f"store_own_user_name VARCHAR(50) COMMENT '店主名称', " \
f"store_own_user_tel VARCHAR(15) COMMENT '店主手机号', " \
f"store_category VARCHAR(10) COMMENT '店铺类型(normal,test)', " \
f"store_address VARCHAR(255) COMMENT '店铺地址', " \
f"store_shop_no VARCHAR(255) COMMENT '店铺第三方支付id号', " \
f"store_province VARCHAR(10) COMMENT '店铺所在省', " \
f"store_city VARCHAR(10) COMMENT '店铺所在市', " \
f"store_district VARCHAR(10) COMMENT '店铺所在行政区', " \
f"store_gps_name VARCHAR(255) COMMENT '店铺gps名称', " \
f"store_gps_address VARCHAR(255) COMMENT '店铺gps地址', " \
f"store_gps_longitude VARCHAR(255) COMMENT '店铺gps经度', " \
f"store_gps_latitude VARCHAR(255) COMMENT '店铺gps纬度', " \
f"is_signed TINYINT COMMENT '是否第三方支付签约(0,1)', " \
f"operator VARCHAR(10) COMMENT '操作员', " \
f"operator_name VARCHAR(50) COMMENT '操作员名称', " \
f"face_id VARCHAR(255) COMMENT '顾客面部识别ID', " \
f"member_id VARCHAR(255) COMMENT '顾客会员ID', " \
f"store_create_date_ts TIMESTAMP COMMENT '店铺创建时间', " \
f"origin VARCHAR(255) COMMENT '原始信息(无用)', " \
f"day_order_seq INT COMMENT '本订单是当日第几单', " \
f"discount_rate DECIMAL(10, 5) COMMENT '折扣率', " \
f"discount_type TINYINT COMMENT '折扣类型', " \
f"discount DECIMAL(10, 5) COMMENT '折扣金额', " \
f"money_before_whole_discount DECIMAL(10, 5) COMMENT '折扣前总金额', " \
f"receivable DECIMAL(10, 5) COMMENT '应收金额', " \
f"erase DECIMAL(10, 5) COMMENT '抹零金额', " \
f"small_change DECIMAL(10, 5) COMMENT '找零金额', " \
f"total_no_discount DECIMAL(10, 5) COMMENT '总价格(无折扣)', " \
f"pay_total DECIMAL(10, 5) COMMENT '付款金额', " \
f"pay_type VARCHAR(10) COMMENT '付款类型', " \
f"payment_channel TINYINT COMMENT '付款通道', " \
f"payment_scenarios VARCHAR(15) COMMENT '付款描述(无用)', " \
f"product_count INT COMMENT '本单卖出多少商品', " \
f"date_ts TIMESTAMP COMMENT '订单时间', " \
f"INDEX (receivable), INDEX (date_ts)"
# 订单详情表名称
target_orders_detail_table_name = "orders_detail"
# 订单详情建表列信息
target_orders_detail_table_create_cols = \
f"order_id VARCHAR(255) COMMENT '订单ID', " \
f"barcode VARCHAR(255) COMMENT '商品条码', " \
f"name VARCHAR(255) COMMENT '商品名称', " \
f"count INT COMMENT '本单此商品卖出数量', " \
f"price_per DECIMAL(10, 5) COMMENT '实际售卖单价', " \
f"retail_price DECIMAL(10, 5) COMMENT '零售建议价', " \
f"trade_price DECIMAL(10, 5) COMMENT '贸易价格(进货价)', " \
f"category_id INT COMMENT '商品类别ID', " \
f"unit_id INT COMMENT '商品单位ID(包、袋、箱、等)', " \
f"PRIMARY KEY (order_id, barcode)"
# 订单数据写出 csv 的根路径
retail_output_csv_root_path = "D:/etl/output/csv"
# 每一次运行,订单文件写出路径
retail_orders_output_csv_file_name = \
f"orders-{time.strftime('%Y-%m-%d-%H_%M', time.localtime())}.csv"
retail_orders_detail_output_csv_file_name = \
f"orders-detail-{time.strftime('%Y-%m-%d-%H_%M', time.localtime())}.csv"
8.4 订单JSON模型实现(
重点理解
)
mysql的插入形式:
INSERT INTO :
插入数据,如果插入失败(唯一约束生效,插入重复主键), 报错
INSERT IGNORE INTO :
插入数据,如果插入失败(唯一约束生效,插入重复主键), 忽略
REPLACE INTO:
插入数据,如果插入失败(唯一约束生效,插入重复主键), 替换例如插入数据的id值为3 但是表中已有该id的数据记录
则 1 会报错 2 则不插入数据 3 会覆盖原数据
model/order_model.py
"""
订单业务数据模型:
1)订单数据模型:OrdersModel
2)单个售卖商品数据模型:SingleProductSoldModel
3)订单详情数据模型:OrdersDetailModel
4)原始数据模型(订单数据+订单详情数据组合模型):RetailOriginJsonModel
"""
import json
from config import project_config as conf
from util import str_util, time_util
class OrdersModel(object):
"""订单数据模型"""
def __init__(self, data):
"""
利用传入的订单json数据构建订单数据模型对象
"""
# 将 json 数据转换为字典
data = json.loads(data)
# 初始化订单数据模型对象
self.discount_rate = data['discountRate'] # 折扣率
self.store_shop_no = data['storeShopNo'] # 店铺店号(无用列)
self.day_order_seq = data['dayOrderSeq'] # 本单为当日第几单
self.store_district = data['storeDistrict'] # 店铺所在行政区
self.is_signed = data['isSigned'] # 是否签约店铺(签约第三方支付体系)
self.store_province = data['storeProvince'] # 店铺所在省份
self.origin = data['origin'] # 原始信息(无用)
self.store_gps_longitude = data['storeGPSLongitude'] # 店铺GPS经度
self.discount = data['discount'] # 折扣金额
self.store_id = data['storeID'] # 店铺ID
self.product_count = data['productCount'] # 本单售卖商品数量
self.operator_name = data['operatorName'] # 操作员姓名
self.operator = data['operator'] # 操作员ID
self.store_status = data['storeStatus'] # 店铺状态
self.store_own_user_tel = data['storeOwnUserTel'] # 店铺店主电话
self.pay_type = data['payType'] # 支付类型
self.discount_type = data['discountType'] # 折扣类型
self.store_name = data['storeName'] # 店铺名称
self.store_own_user_name = data['storeOwnUserName'] # 店铺店主名称
self.date_ts = data['dateTS'] # 订单时间
self.small_change = data['smallChange'] # 找零金额
self.store_gps_name = data['storeGPSName'] # 店铺GPS名称
self.erase = data['erase'] # 是否抹零
self.store_gps_address = data['storeGPSAddress'] # 店铺GPS地址
self.order_id = data['orderID'] # 订单ID
self.money_before_whole_discount = data['moneyBeforeWholeDiscount'] # 折扣前金额
self.store_category = data['storeCategory'] # 店铺类别
self.receivable = data['receivable'] # 应收金额
self.face_id = data['faceID'] # 面部识别ID
self.store_own_user_id = data['storeOwnUserId'] # 店铺店主ID
self.payment_channel = data['paymentChannel'] # 付款通道
self.payment_scenarios = data['paymentScenarios'] # 付款情况(无用)
self.store_address = data['storeAddress'] # 店铺地址
self.total_no_discount = data['totalNoDiscount'] # 整体价格(无折扣)
self.payed_total = data['payedTotal'] # 已付款金额
self.store_gps_latitude = data['storeGPSLatitude'] # 店铺GPS纬度
self.store_create_date_ts = data['storeCreateDateTS'] # 店铺创建时间
self.store_city = data['storeCity'] # 店铺所在城市
self.member_id = data['memberID'] # 会员ID
def generate_insert_sql(self):
"""
生成添加表数据的SQL语句
"""
sql = f"INSERT IGNORE INTO {conf.target_orders_table_name}(" \
f"order_id,store_id,store_name,store_status,store_own_user_id," \
f"store_own_user_name,store_own_user_tel,store_category," \
f"store_address,store_shop_no,store_province,store_city," \
f"store_district,store_gps_name,store_gps_address," \
f"store_gps_longitude,store_gps_latitude,is_signed," \
f"operator,operator_name,face_id,member_id,store_create_date_ts," \
f"origin,day_order_seq,discount_rate,discount_type,discount," \
f"money_before_whole_discount,receivable,erase,small_change," \
f"total_no_discount,pay_total,pay_type,payment_channel," \
f"payment_scenarios,product_count,date_ts" \
f") VALUES(" \
f"'{self.order_id}', " \
f"{self.store_id}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_name)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_status)}, " \
f"{self.store_own_user_id}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_own_user_name)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_own_user_tel)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_category)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_address)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_shop_no)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_province)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_city)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_district)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_gps_name)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_gps_address)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_gps_longitude)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_gps_latitude)}, " \
f"{self.is_signed}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.operator)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.operator_name)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.face_id)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.member_id)}, " \
f"'{time_util.ts13_to_date_str(self.store_create_date_ts)}', " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.origin)}, " \
f"{self.day_order_seq}, " \
f"{self.discount_rate}, " \
f"{self.discount_type}, " \
f"{self.discount}, " \
f"{self.money_before_whole_discount}, " \
f"{self.receivable}, " \
f"{self.erase}, " \
f"{self.small_change}, " \
f"{self.total_no_discount}, " \
f"{self.payed_total}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.pay_type)}, " \
f"{self.payment_channel}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.payment_scenarios)}, " \
f"{self.product_count}, " \
f"'{time_util.ts13_to_date_str(self.date_ts)}')"
return sql
@staticmethod
def get_csv_header(sep=','):
"""
生成 csv 数据的标头内容
"""
header = f"order_id{sep}" \
f"store_id{sep}" \
f"store_name{sep}" \
f"store_status{sep}" \
f"store_own_user_id{sep}" \
f"store_own_user_name{sep}" \
f"store_own_user_tel{sep}" \
f"store_category{sep}" \
f"store_address{sep}" \
f"store_shop_no{sep}" \
f"store_province{sep}" \
f"store_city{sep}" \
f"store_district{sep}" \
f"store_gps_name{sep}" \
f"store_gps_address{sep}" \
f"store_gps_longitude{sep}" \
f"store_gps_latitude{sep}" \
f"is_signed{sep}" \
f"operator{sep}" \
f"operator_name{sep}" \
f"face_id{sep}" \
f"member_id{sep}" \
f"store_create_date_ts{sep}" \
f"origin{sep}" \
f"day_order_seq{sep}" \
f"discount_rate{sep}" \
f"discount_type{sep}" \
f"discount{sep}" \
f"money_before_whole_discount{sep}" \
f"receivable{sep}" \
f"erase{sep}" \
f"small_change{sep}" \
f"total_no_discount{sep}" \
f"pay_total{sep}" \
f"pay_type{sep}" \
f"payment_channel{sep}" \
f"payment_scenarios{sep}" \
f"product_count{sep}" \
f"date_ts\n"
return header
def check_and_transform_area(self):
"""
检查省市区内容,为空就转换为未知
"""
if str_util.check_null(self.store_province):
self.store_province = '未知省份'
if str_util.check_null(self.store_city):
self.store_city = '未知城市'
if str_util.check_null(self.store_district):
self.store_district = '未知行政区'
def check_and_transform_all_column(self):
"""
转换全部的列,如果是空内容,就将其设置为空字符串
"""
self.discount_rate = str_util.check_null_and_transform(self.discount_rate)
self.store_shop_no = str_util.check_null_and_transform(self.store_shop_no)
self.day_order_seq = str_util.check_null_and_transform(self.day_order_seq)
self.store_district = str_util.check_null_and_transform(self.store_district)
self.is_signed = str_util.check_null_and_transform(self.is_signed)
self.store_province = str_util.check_null_and_transform(self.store_province)
self.origin = str_util.check_null_and_transform(self.origin)
self.store_gps_longitude = str_util.check_null_and_transform(self.store_gps_longitude)
self.discount = str_util.check_null_and_transform(self.discount)
self.store_id = str_util.check_null_and_transform(self.store_id)
self.product_count = str_util.check_null_and_transform(self.product_count)
self.operator_name = str_util.check_null_and_transform(self.operator_name)
self.operator = str_util.check_null_and_transform(self.operator)
self.store_status = str_util.check_null_and_transform(self.store_status)
self.store_own_user_tel = str_util.check_null_and_transform(self.store_own_user_tel)
self.pay_type = str_util.check_null_and_transform(self.pay_type)
self.discount_type = str_util.check_null_and_transform(self.discount_type)
self.store_name = str_util.check_null_and_transform(self.store_name)
self.store_own_user_name = str_util.check_null_and_transform(self.store_own_user_name)
self.date_ts = str_util.check_null_and_transform(self.date_ts)
self.small_change = str_util.check_null_and_transform(self.small_change)
self.store_gps_name = str_util.check_null_and_transform(self.store_gps_name)
self.erase = str_util.check_null_and_transform(self.erase)
self.store_gps_address = str_util.check_null_and_transform(self.store_gps_address)
self.order_id = str_util.check_null_and_transform(self.order_id)
self.money_before_whole_discount = str_util.check_null_and_transform(self.money_before_whole_discount)
self.store_category = str_util.check_null_and_transform(self.store_category)
self.receivable = str_util.check_null_and_transform(self.receivable)
self.face_id = str_util.check_null_and_transform(self.face_id)
self.store_own_user_id = str_util.check_null_and_transform(self.store_own_user_id)
self.payment_channel = str_util.check_null_and_transform(self.payment_channel)
self.payment_scenarios = str_util.check_null_and_transform(self.payment_scenarios)
self.store_address = str_util.check_null_and_transform(self.store_address)
self.total_no_discount = str_util.check_null_and_transform(self.total_no_discount)
self.payed_total = str_util.check_null_and_transform(self.payed_total)
self.store_gps_latitude = str_util.check_null_and_transform(self.store_gps_latitude)
self.store_create_date_ts = str_util.check_null_and_transform(self.store_create_date_ts)
self.store_city = str_util.check_null_and_transform(self.store_city)
self.member_id = str_util.check_null_and_transform(self.member_id)
def to_csv(self, sep=','):
"""
生成 csv 数据,分割符默认为逗号。
Note: 生成的数据顺序和header是一一对应的,不要混乱了。
"""
self.check_and_transform_area()
self.check_and_transform_all_column()
csv_line = \
f"{self.order_id}{sep}" \
f"{self.store_id}{sep}" \
f"{self.store_name}{sep}" \
f"{self.store_status}{sep}" \
f"{self.store_own_user_id}{sep}" \
f"{self.store_own_user_name}{sep}" \
f"{self.store_own_user_tel}{sep}" \
f"{self.store_category}{sep}" \
f"{self.store_address}{sep}" \
f"{self.store_shop_no}{sep}" \
f"{self.store_province}{sep}" \
f"{self.store_city}{sep}" \
f"{self.store_district}{sep}" \
f"{self.store_gps_name}{sep}" \
f"{self.store_gps_address}{sep}" \
f"{self.store_gps_longitude}{sep}" \
f"{self.store_gps_latitude}{sep}" \
f"{self.is_signed}{sep}" \
f"{self.operator}{sep}" \
f"{self.operator_name}{sep}" \
f"{self.face_id}{sep}" \
f"{self.member_id}{sep}" \
f"{time_util.ts13_to_date_str(self.store_create_date_ts)}{sep}" \
f"{self.origin}{sep}" \
f"{self.day_order_seq}{sep}" \
f"{self.discount_rate}{sep}" \
f"{self.discount_type}{sep}" \
f"{self.discount}{sep}" \
f"{self.money_before_whole_discount}{sep}" \
f"{self.receivable}{sep}" \
f"{self.erase}{sep}" \
f"{self.small_change}{sep}" \
f"{self.total_no_discount}{sep}" \
f"{self.payed_total}{sep}" \
f"{self.pay_type}{sep}" \
f"{self.payment_channel}{sep}" \
f"{self.payment_scenarios}{sep}" \
f"{self.product_count}{sep}" \
f"{time_util.ts13_to_date_str(self.date_ts)}\n"
return csv_line
class SingleProductSoldModel(object):
"""订单售卖商品数据模型"""
def __init__(self, order_id, product_detail):
self.order_id = order_id
self.count = product_detail['count']
self.name = product_detail['name']
self.unit_id = product_detail['unitID']
self.barcode = product_detail['barcode']
self.price_per = product_detail['pricePer']
self.retail_price = product_detail['retailPrice']
self.trade_price = product_detail['tradePrice']
self.category_id = product_detail['categoryID']
def generate_value_segment_for_sql_insert(self):
"""
生成添加表数据SQL语句的VALUE语句段
"""
segment = f"(" \
f"'{self.order_id}', " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.barcode)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.name)}, " \
f"{self.count}, " \
f"{self.price_per}, " \
f"{self.retail_price}, " \
f"{self.trade_price}, " \
f"{self.category_id}, " \
f"{self.unit_id}" \
f")"
return segment
def to_csv(self, sep=","):
"""
生成一条csv数据,分隔符默认逗号
"""
csv_line = \
f"{self.order_id}{sep}" \
f"{self.barcode}{sep}" \
f"{self.name}{sep}" \
f"{self.count}{sep}" \
f"{self.price_per}{sep}" \
f"{self.retail_price}{sep}" \
f"{self.trade_price}{sep}" \
f"{self.category_id}{sep}" \
f"{self.unit_id}\n"
return csv_line
class OrdersDetailModel(object):
"""订单详情数据模型"""
def __init__(self, data):
"""
利用传入的订单json数据构建订单详情数据模型对象
"""
data = json.loads(data)
order_products_list = data['product']
self.order_id = data['orderID']
self.products_detail = [] # 记录当前订单卖出的商品
for sing_product in order_products_list:
product = SingleProductSoldModel(self.order_id, sing_product)
self.products_detail.append(product)
def generate_insert_sql(self):
"""
生成添加表数据的SQL语句
"""
sql = f"INSERT IGNORE INTO {conf.target_orders_detail_table_name}(" \
f"order_id,barcode,name,count,price_per,retail_price,trade_price,category_id,unit_id" \
f") VALUES"
for single_product_sold_model in self.products_detail:
sql += single_product_sold_model.generate_value_segment_for_sql_insert() + ", "
# 去除最后的逗号
sql = sql[:-2]
return sql
@staticmethod
def get_csv_header(sep=','):
"""
生成 csv 数据的标头内容
"""
return f"order_id{sep}" \
f"barcode{sep}" \
f"name{sep}" \
f"count{sep}" \
f"price_per{sep}" \
f"retail_price{sep}" \
f"trade_price{sep}" \
f"category_id{sep}" \
f"unit_id\n"
def to_csv(self):
"""生成添加csv的数据行"""
csv_lines = ''
for single_product_sold_model in self.products_detail:
csv_lines += single_product_sold_model.to_csv()
return csv_lines
class RetailOriginJsonModel(object):
"""
原始订单JSON数据模型
"""
def __init__(self, data):
self.order_model = OrdersModel(data)
self.order_detail_model = OrdersDetailModel(data)
def get_order_model(self):
return self.order_model
def get_order_detail_model(self):
return self.order_detail_model
def get_order_id(self):
return self.order_model.order_id
def get_products_list(self):
return self.order_detail_model.products_detail
09-订单JSON数据采集业务实现
9.1 获取未采集文件列表
- 获取订单文件夹下面有哪些订单JSON文件
- 查询元数据库表中已经被采集的订单JSON文件,来对比确定要采集新的订单JSON文件
- 创建元数据连接
- 检查数据表是否存在,不存在则创建
- 获取元数据中已经被采集的Json文件路径列表
- 对比确定要采集的新订单文件
- 增加一个配置文件
# JSON订单数据源的文件路径
json_data_path = 'D:/etl/json/'
order_json_service.py
# 1. 获取订单文件夹下面有哪些订单JSON文件
all_json_files = file_util.get_dir_files_list(conf.json_data_path)
# 2. 查询元数据库表中已经被采集的订单JSON文件,来对比确定要采集新的订单JSON文件
# 2.1 创建元数据连接
metadata_util = mysql_util.get_mysql_util(
host=conf.metadata_host,
port=conf.metadata_port,
user=conf.metadata_user,
password=conf.metadata_password
)
# 2.2 检查数据表是否存在,不存在则创建
metadata_util.check_table_exists_and_create(
db_name=conf.metadata_db,
tb_name=conf.file_monitor_meta_table_name,
tb_cols=conf.file_monitor_meta_table_create_cols
)
# 2.3 获取元数据中已经被采集的Json文件路径列表
sql = f'select * from {conf.file_monitor_meta_table_name};'
result = metadata_util.query(sql)
processed_json_files = [file_tuple[1] for file_tuple in result]
# 2.4 对比确定要采集的新订单文件
new_json_files = file_util.get_new_by_compare_lists(processed_json_files, all_json_files)
9.2 对新JSON文件进行数据采集
- 针对待采集的新订单JSON文件,进行数据采集(ETL操作->mysql->csv)
- 创建csv文件,并打开文件(订单表+订单详情表)
- 向CSV文件中写入标头信息(订单表+订单详情表)
- 创建一个目标数据库连接对象
- 检查数据表是否存在,不存在则创建
- 遍历待处理的JSON文件
- 按行读取JSON文件, 创建数据模型
- 对模型进行处理,订单价格高于10000的记录直接剔除
- 写入到目标表中
- 写入到csv文件中
order_json_service.py
# 3. 针对待采集的新订单JSON文件,进行数据采集(ETL操作->mysql->csv)
# 3.1 创建csv文件,并打开文件(订单表+订单详情表)
# 订单
csv_order_file = open(conf.retail_output_csv_root_path + conf.retail_orders_output_csv_file_name, 'a', encoding='utf8')
# 订单详情
csv_order_detail_file = open(conf.retail_output_csv_root_path + conf.retail_orders_detail_output_csv_file_name, 'a', encoding='utf8')
# 3.2 向CSV文件中写入标头信息(订单表+订单详情表)
# 订单
csv_order_file.write(retail_orders_model.OrdersModel.get_csv_header())
# 订单详情
csv_order_detail_file.write(retail_orders_model.OrdersDetailModel.get_csv_header())
# 3.3 创建一个目标数据库连接对象
target_util = mysql_util.get_mysql_util(
host=conf.target_host,
port=conf.target_port,
user=conf.target_user,
password=conf.target_password
)
# 3.4 检查数据表是否存在,不存在则创建(订单表+订单详情表)
# 订单表
target_util.check_table_exists_and_create(
db_name=conf.target_data_db,
tb_name=conf.target_orders_table_name,
tb_cols=conf.target_orders_table_create_cols
)
# 订单详情表
target_util.check_table_exists_and_create(
db_name=conf.target_data_db,
tb_name=conf.target_orders_detail_table_name,
tb_cols=conf.target_orders_detail_table_create_cols
)
# 3.5 遍历待处理的JSON文件(new_json_files)
for json_file in new_json_files:
# 3.5.1 按行读取JSON文件,
for json_data in open(json_file, 'r', encoding='utf8'):
# 创建数据模型(原始json数据模型)
model = retail_orders_model.RetailOriginJsonModel(json_data)
# 3.5.2 对模型进行处理,订单价格高于10000的记录直接剔除(模拟数据清洗过程,没有实际意义)
if model.order_model.receivable <= 10000:
# 3.5.3 写入到目标表中(订单表+订单详情表)
# 创建sql语句
order_sql = model.order_model.generate_insert_sql()
order_detail_sql = model.order_detail_model.generate_insert_sql()
# 执行插入指令
target_util.insert_single_sql_without_commit(order_sql)
target_util.insert_single_sql_without_commit(order_detail_sql)
# 3.5.4 写入到csv文件中(订单文件+订单详情文件)
# 创建CSV字符串
order_csv_str = model.order_model.to_csv()
order_detail_csv_str = model.order_detail_model.to_csv()
# 写入CSV文件中
csv_order_file.write(order_csv_str)
csv_order_detail_file.write(order_detail_csv_str)
# 3.6 关闭数据库连接和文件
target_util.close()
metadata_util.close()
csv_order_file.close()
csv_order_detail_file.close()
9.3 添加事务
一个文件开始处理前开启事务,处理完成后提交事务
如果文件处理过程中出现异常则回滚事务
order_json_service.py
# 3. 针对待采集的新订单JSON文件,进行数据采集(ETL操作->mysql->csv)
# 3.1 创建csv文件,并打开文件(订单表+订单详情表)
# 订单
csv_order_file = open(conf.retail_output_csv_root_path + conf.retail_orders_output_csv_file_name, 'a', encoding='utf8')
# 订单详情
csv_order_detail_file = open(conf.retail_output_csv_root_path + conf.retail_orders_detail_output_csv_file_name, 'a', encoding='utf8')
# 3.2 向CSV文件中写入标头信息(订单表+订单详情表)
# 订单
csv_order_file.write(retail_orders_model.OrdersModel.get_csv_header())
# 订单详情
csv_order_detail_file.write(retail_orders_model.OrdersDetailModel.get_csv_header())
# 3.3 创建一个目标数据库连接对象
target_util = mysql_util.get_mysql_util(
host=conf.target_host,
port=conf.target_port,
user=conf.target_user,
password=conf.target_password
)
# 3.4 检查数据表是否存在,不存在则创建(订单表+订单详情表)
# 订单表
target_util.check_table_exists_and_create(
db_name=conf.target_data_db,
tb_name=conf.target_orders_table_name,
tb_cols=conf.target_orders_table_create_cols
)
# 订单详情表
target_util.check_table_exists_and_create(
db_name=conf.target_data_db,
tb_name=conf.target_orders_detail_table_name,
tb_cols=conf.target_orders_detail_table_create_cols
)
# 3.5 遍历待处理的JSON文件(new_json_files)
for json_file in new_json_files:
# 事务1: 开启事务
target_util.begin_transaction()
# 事务2: 使用异常捕获如果插入mysql的指令出现异常子回滚事务
try:
# 3.5.1 按行读取JSON文件,
for json_data in open(json_file, 'r', encoding='utf8'):
# 创建数据模型(原始json数据模型)
model = retail_orders_model.RetailOriginJsonModel(json_data)
# 3.5.2 对模型进行处理,订单价格高于10000的记录直接剔除(模拟数据清洗过程,没有实际意义)
if model.order_model.receivable <= 10000:
# 3.5.3 写入到目标表中(订单表+订单详情表)
# 创建sql语句
order_sql = model.order_model.generate_insert_sql()
order_detail_sql = model.order_detail_model.generate_insert_sql()
# 执行插入指令
target_util.insert_single_sql_without_commit(order_sql)
target_util.insert_single_sql_without_commit(order_detail_sql)
# 3.5.4 写入到csv文件中(订单文件+订单详情文件)
# 创建CSV字符串
order_csv_str = model.order_model.to_csv()
order_detail_csv_str = model.order_detail_model.to_csv()
# 写入CSV文件中
csv_order_file.write(order_csv_str)
csv_order_detail_file.write(order_detail_csv_str)
except:
# 事务2.1 : 出现异常事务回滚
target_util.rollback_transaction()
else:
# 事务2.2 : 如果 没有出现异常处理一个文件结束需要提交日志
target_util.commit_transaction()
# 3.6 关闭数据库连接和文件
target_util.close()
metadata_util.close()
csv_order_file.close()
csv_order_detail_file.close()
9.4 记录元数据
- 将本次采集的订单JSON文件,记录到元数据库的表中
注意: 记录元数据时一定要将数反斜杠进行替换
order_json_service.py
# 3.5 遍历待处理的JSON文件(new_json_files)
for json_file in new_json_files:
# 4.2 创建计数器,记录处理数据的条目数
data_count = 0
# 事务1: 开启事务
target_util.begin_transaction()
# 事务2: 使用异常捕获如果插入mysql的指令出现异常子回滚事务
try:
# 3.5.1 按行读取JSON文件,
for json_data in open(json_file, 'r', encoding='utf8'):
# 创建数据模型(原始json数据模型)
model = retail_orders_model.RetailOriginJsonModel(json_data)
# 3.5.2 对模型进行处理,订单价格高于10000的记录直接剔除(模拟数据清洗过程,没有实际意义)
if model.order_model.receivable <= 10000:
# 4.3 提出数据后,将计数器自增
data_count += 1
# 3.5.3 写入到目标表中(订单表+订单详情表)
# 创建sql语句
order_sql = model.order_model.generate_insert_sql()
order_detail_sql = model.order_detail_model.generate_insert_sql()
# 执行插入指令
target_util.insert_single_sql_without_commit(order_sql)
target_util.insert_single_sql_without_commit(order_detail_sql)
# 3.5.4 写入到csv文件中(订单文件+订单详情文件)
# 创建CSV字符串
order_csv_str = model.order_model.to_csv()
order_detail_csv_str = model.order_detail_model.to_csv()
# 写入CSV文件中
csv_order_file.write(order_csv_str)
csv_order_detail_file.write(order_detail_csv_str)
except:
# 事务2.1 : 出现异常事务回滚
target_util.rollback_transaction()
else:
# 事务2.2 : 如果 没有出现异常处理一个文件结束需要提交日志
target_util.commit_transaction()
# 4. 将本次采集的订单JSON文件,记录到元数据库的表中
# 4.1 将文件路径中的反斜杠全部替换为斜杠
json_file = json_file.replace('\\', '/')
# 4.4 构造一个sql语句
sql = f'insert into {conf.file_monitor_meta_table_name}(file_name, process_lines)' \
f'values("{json_file}", {data_count})'
# 4.5 执行sql语句
metadata_util.insert_single_sql(sql)
9.5 记录日志
- 脚本启动时记录日志
- 记录新采集的文件路径有哪些
- 一个文件采集完成记录日志
- 全部采集完成记录日志,并记录文件数和采集时间
# 0. 导入模块
import time
from util import file_util
from util import mysql_util
from util import logging_util
from model import retail_orders_model
from config import project_config as conf
# 日志1: 创建一个日志器对象
logger = logging_util.init_logger('order_json')
# 日志2: 记录脚本执行开始
logger.info('JSON订单数据采集开始...')
# 1. 获取订单文件夹下面有哪些订单JSON文件
all_json_files = file_util.get_dir_files_list(conf.json_data_path)
# 2. 查询元数据库表中已经被采集的订单JSON文件,来对比确定要采集新的订单JSON文件
# 2.1 创建元数据连接
metadata_util = mysql_util.get_mysql_util(
host=conf.metadata_host,
port=conf.metadata_port,
user=conf.metadata_user,
password=conf.metadata_password
)
# 2.2 检查数据表是否存在,不存在则创建
metadata_util.check_table_exists_and_create(
db_name=conf.metadata_db,
tb_name=conf.file_monitor_meta_table_name,
tb_cols=conf.file_monitor_meta_table_create_cols
)
# 2.3 获取元数据中已经被采集的Json文件路径列表
sql = f'select * from {conf.file_monitor_meta_table_name};'
result = metadata_util.query(sql)
processed_json_files = [file_tuple[1] for file_tuple in result]
# 2.4 对比确定要采集的新订单文件
new_json_files = file_util.get_new_by_compare_lists(processed_json_files, all_json_files)
# 日志3: 记录新采集的日志文件路径有哪些
if new_json_files:
logger.info(f'被采集的文件路径列表为{new_json_files}...')
else:
logger.info(f'没有待采集的文件,退出程序...')
exit('没有待采集文件,采集结束')
# 3. 针对待采集的新订单JSON文件,进行数据采集(ETL操作->mysql->csv)
# 3.1 创建csv文件,并打开文件(订单表+订单详情表)
# 订单
csv_order_file = open(conf.retail_output_csv_root_path + conf.retail_orders_output_csv_file_name, 'a', encoding='utf8')
# 订单详情
csv_order_detail_file = open(conf.retail_output_csv_root_path + conf.retail_orders_detail_output_csv_file_name, 'a',
encoding='utf8')
# 3.2 向CSV文件中写入标头信息(订单表+订单详情表)
# 订单
csv_order_file.write(retail_orders_model.OrdersModel.get_csv_header())
# 订单详情
csv_order_detail_file.write(retail_orders_model.OrdersDetailModel.get_csv_header())
# 3.3 创建一个目标数据库连接对象
target_util = mysql_util.get_mysql_util(
host=conf.target_host,
port=conf.target_port,
user=conf.target_user,
password=conf.target_password
)
# 3.4 检查数据表是否存在,不存在则创建(订单表+订单详情表)
# 订单表
target_util.check_table_exists_and_create(
db_name=conf.target_data_db,
tb_name=conf.target_orders_table_name,
tb_cols=conf.target_orders_table_create_cols
)
# 订单详情表
target_util.check_table_exists_and_create(
db_name=conf.target_data_db,
tb_name=conf.target_orders_detail_table_name,
tb_cols=conf.target_orders_detail_table_create_cols
)
# 日志5.1 :记录脚本开始时间
start = time.time()
# 3.5 遍历待处理的JSON文件(new_json_files)
for json_file in new_json_files:
# 4.2 创建计数器,记录处理数据的条目数
data_count = 0
# 事务1: 开启事务
target_util.begin_transaction()
# 事务2: 使用异常捕获如果插入mysql的指令出现异常子回滚事务
try:
# 3.5.1 按行读取JSON文件,
for json_data in open(json_file, 'r', encoding='utf8'):
# 创建数据模型(原始json数据模型)
model = retail_orders_model.RetailOriginJsonModel(json_data)
# 3.5.2 对模型进行处理,订单价格高于10000的记录直接剔除(模拟数据清洗过程,没有实际意义)
if model.order_model.receivable <= 10000:
# 4.3 提出数据后,将计数器自增
data_count += 1
# 3.5.3 写入到目标表中(订单表+订单详情表)
# 创建sql语句
order_sql = model.order_model.generate_insert_sql()
order_detail_sql = model.order_detail_model.generate_insert_sql()
# 执行插入指令
target_util.insert_single_sql_without_commit(order_sql)
target_util.insert_single_sql_without_commit(order_detail_sql)
# 3.5.4 写入到csv文件中(订单文件+订单详情文件)
# 创建CSV字符串
order_csv_str = model.order_model.to_csv()
order_detail_csv_str = model.order_detail_model.to_csv()
# 写入CSV文件中
csv_order_file.write(order_csv_str)
csv_order_detail_file.write(order_detail_csv_str)
except:
# 事务2.1 : 出现异常事务回滚
target_util.rollback_transaction()
else:
# 事务2.2 : 如果 没有出现异常处理一个文件结束需要提交日志
target_util.commit_transaction()
# 4. 将本次采集的订单JSON文件,记录到元数据库的表中
# 4.1 将文件路径中的反斜杠全部替换为斜杠
json_file = json_file.replace('\\', '/')
# 4.4 构造一个sql语句
sql = f'insert into {conf.file_monitor_meta_table_name}(file_name, process_lines)' \
f'values("{json_file}", {data_count})'
# 4.5 执行sql语句
metadata_util.insert_single_sql(sql)
# 日志4: 一个文件采集完成记录日志
logger.info(f'文件{json_file}采集完成,共采集{data_count}行数据...')
logger.info(f'元数据以记录完成,记录文件路径为{json_file}')
# 日志5.2: 记录脚本执行结束时间
end = time.time()
# 日志5.3: 记录脚本采集的文件数量和采集时间
logger.info(f'本次脚本执行,共采集了{len(new_json_files)}个文件, 从耗时{end-start}s...')
# 3.6 关闭数据库连接和文件
target_util.close()
metadata_util.close()
csv_order_file.close()
csv_order_detail_file.close()