手把手教你实现Python连接数据库并快速取数的工具

  • Post author:
  • Post category:python


在数据生产应用部门,取数分析是一个很常见的需求,实际上业务人员需求时刻变化,最高效的方式是让业务部门自己来取。本文就来手把手教大家搭建一个 Python 连接数据库,快速取数工具,需要的可以参考一下。

目录

在数据生产应用部门,取数分析是一个很常见的需求,实际上业务人员需求时刻变化,最高效的方式是让业务部门自己来取,减少不必要的重复劳动,一般情况下,业务部门数据库表结构一般是固定的,根据实际业务将取数需求做成sql 脚本,快速完成数据获取—授人以渔的方式,提供平台或工具

那如何实现一个自助取数查询工具?

基于底层数据来开发不难,无非是将用户输入变量作为筛选条件,将参数映射到 sql 语句,并生成一个 sql 语句然后再去数据库执行


前言

最后再利用 QT 开发一个 GUI 界面,用户界面的点击和筛选条件,信号触发对应按钮与绑定的传参槽函数执行

具体思路:

1.数据库连接类

此处利用 pandas 读写操作 oracle 数据库

2.主函数模块

1)输入参数模块,外部输入条件参数,建立数据库关键字段映射

–注:读取外部 txt 文件,将筛选字段可能需要进行键值对转换

2)sql 语句集合模块,将待执行的业务 sql 语句统一存放到这里

3)数据处理函数工厂

4)使用多线程提取数据


一、数据库连接类

cx_Oracle 是一个 Python 扩展模块,相当于 python 的 Oracle 数据库的驱动,通过使用所有数据库访问模块通用的数据库 API 来实现 Oracle 数据库的查询和更新

Pandas 是基于 NumPy 开发,为了解决数据分析任务的模块,Pandas 引入了大量库和一些标准的数据模型,提供了高效地操作大型数据集所需的方法类和函数

pandas 调用数据库主要有 read_sql_table,read_sql_query,read_sql 三种方式

本文主要介绍一下 Pandas 中 read_sql_query 方法的使用


1:pd.read_sql_query()

读取自定义数据,返还DataFrame格式,通过SQL查询脚本包括增删改查。

1


pd.read_sql_query(sql, con, index_col


=


None


,coerce_float


=


True


, params


=


None


, parse_dates


=


None


,chunksize


=


None


)

sql:要执行的sql脚本,文本类型

con:数据库连接

index_col:选择返回结果集索引的列,文本/文本列表

coerce_float:非常有用,将数字形式的字符串直接以float型读入

parse_dates:将某一列日期型字符串转换为datetime型数据,与pd.to_datetime函数功能类似。

params:向sql脚本中传入的参数,官方类型有列表,元组和字典。用于传递参数的语法是数据库驱动程序相关的。

chunksize:如果提供了一个整数值,那么就会返回一个generator,每次输出的行数就是提供的值的大小

read_sql_query()中可以接受SQL语句,DELETE,INSERT INTO、UPDATE操作没有返回值(但是会在数据库中执行),程序会抛出SourceCodeCloseError,并终止程序。SELECT会返回结果。如果想继续运行,可以try捕捉此异常。


2:pd.read_sql_table()

读取数据库中的表,返还DataFrame格式(通过表名)

1

2


import


pandas as pd


pd.read_sql_table(table_name, con, schema


=


None


,index_col


=


None


, coerce_float


=


True


, parse_dates


=


None


, columns


=


None


,chunksize


=


None


)


3:pd.read_sql()

读数据库通过SQL脚本或者表名

1

2


import


pandas as pd


pd.read_sql(sql, con, index_col


=


None


,coerce_float


=


True


, params


=


None


, parse_dates


=


None


, columns


=


None


, chunksize


=


None


)

以下创建连接 oracel 数据库的连接类 Oracle_DB

主要提供 2 种操作数据的函数方法。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51


import


cx_Oracle


# Pandas读写操作Oracle数据库


import


pandas as pd


# 避免编码问题带来的乱码


import


os


os.environ[


'NLS_LANG'


]


=


'SIMPLIFIED CHINESE_CHINA.UTF8'


class


Oracle_DB(


object


):




def


__init__(


self


):




try


:




# 连接oracle




# 方法1:sqlalchemy 提供的create_engine()




# from sqlalchemy import create_engine




# engine = create_engine('oracle+cx_oracle://username:password@ip:1521/ORCL')




# #方法2:cx_Oracle.connect()




self


.engine


=


cx_Oracle.connect(


'username'


,


'password'


,


'ip:1521/database'


)




except


cx_Oracle.Error as e:




print


(


"Error %d:%s"


%


(e.args[


0


], e.args[


1


]))




exit()





# 查询部分信息




def


search_one(


self


, sql,sparm):




try


:




# #查询获取数据用sql语句




# 代传参数:sparm--查询指定字段参数




df


=


pd.read_sql_query(sql,


self


.engine,params


=


sparm)




self


.engine.close()




except


Exception as e:




return


"Error "


+


e.args[


0


]




return


df




# 查询全部信息




def


search_all(


self


, sql):




try


:




# #查询获取数据用sql语句




df


=


pd.read_sql_query(sql,


self


.engine)




self


.engine.close()




except


Exception as e:




return


"Error "


+


e.args[


0


]




return


df


二、数据提取主函数模块

cx_Oracle 是一个 Python 扩展模块,相当于 python 的 Oracle 数据库的驱动,通过使用所有数据库访问模块通用的数据库 API 来实现 Oracle 数据库的查询和更新。

1)外部输入参数模块

txt 文本中,就包含一列数据,第一行列名,读取的时候忽略第一行

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20


#建立ID——编号字典


def


buildid():




sqlid


=


"""select * from b_build_info"""




db


=


Oracle_DB()


# 实例化一个对象




b_build_info


=


db.search_all(sqlid)




ID_bUILDCODE


=


b_build_info.set_index(


"BUILDCODE"


)[


"ID"


].to_dict()




return


ID_bUILDCODE



#通过文本传入待导出数据清单


def


read_task_list():




build_code


=


buildid()




tasklist


=


[]




is_first_line


=


True




with


open


(


"./b_lst.txt"


) as lst:




for


line


in


lst:




if


is_first_line:




is_first_line


=


False




continue




tasklist.append(build_code.get(line.strip(


'\n'


)))


#键值对转换




return


tasklist

2)业务 sql 语句集合

注意in后面{0}不要加引号,这里传入为元组,params 参数传入sparm

= {‘Start_time’:’2021-04-01′,’End_time’:’2021-05-01′},此处参数可根据需要改变

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24


def


sql_d(lst):




# 逐月数据




sql_d_energy_item_month


=


"""select * from d_energy_item_month




where recorddate >= to_date(:Start_time, 'yyyy-MM-dd')




and recorddate < to_date(:End_time, 'yyyy-MM-dd')




and  buildid  in {0}




order by recorddate asc"""


.


format


(lst)




# 逐月数据




sql_d_energy_month


=


"""select d.*,t.name from d_energy_month d join t_device_info t on d.branchid = t.id




where d.recorddate >= to_date(:Start_time, 'yyyy-MM-dd')




and d.recorddate < to_date(:End_time, 'yyyy-MM-dd')




and d.buildid = '{0}'




order by d.recorddate asc"""


.


format


(lst)




# 查询当日数据




sql_energy_item_hour_cheak


=


"""select * from d_energy_item_hour




where trunc(sysdate)=trunc(recorddate)




order by recorddate asc"""


.


format


(lst)




sql_collection


=


[sql_d_energy_item_month, sql_d_energy_item_day, sql_d_energy_item_hour, sql_d_energy_month,




sql_d_energy_day, sql_d_energy_hour, sql_energy_hour_cheak]




#此处省略部分sql语句




return


sql_collection

3)业务数据处理

业务数据处理流程,原始数据后处理,这里不作介绍:

1

2

3

4

5

6

7

8

9

10


def


db_extranction(lst,sparm,sql_type):




"""sql_type--输入需要操作的sql业务序号"""




sql_


=


sql_d(lst)[sql_type]


#输出sql语句




db


=


Oracle_DB()


# 实例化一个对象




res


=


db.search_one(sql_,sparm)




# 数据处理加工




RES


=


Data_item_factory(res)


#此处省略




# res = db.search_all(sql_d_energy_item_month)




print


(RES)




return


RES

多线程提取数据部分,这里 tasklist 列表多线程提取数据

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28


import


threading


# Pandas读写操作Oracle数据库


from


tools.Data_Update_oracle


import


Oracle_DB


import


pandas as pd


from


concurrent


import


futures


if


__name__


=


=


'__main__'


:




#外部传入




tasklist


=


read_task_list()




print


(tasklist)




# 输入时间查找范围参数,可手动修改




sparm


=


{



'Start_time'


:


'2021-04-01'


,


'End_time'


:


'2021-05-01'


}




lst


=


tuple


(


list


(tasklist))





#业务类型序号,可手动修改




sql_type


=


0





#全部提取




db_extranction(lst,sparm,sql_type)




#多线程按字段分批提取




方法一:使用threading模块的Thread类的构造器创建线程




#threads=[threading.Thread(target=db_extranction,args=(lst,sparm,sql_type)) for lst in tasklist]




# [threads[i].start() for i in range(len(threads))]





方法二:使用python的concurrent库,这是官方基于 threading 封装,先安装该库




# with futures.ThreadPoolExecutor(len(tasklist)) as executor:




#     executor.map([db_extranction(lst,sparm,sql_type) for lst in tasklist],tasklist)

到此整个数据库取数工具开发流程介绍完毕,就差最后一步分享给小伙伴使用了,做成 GUI 应用此处不做详细介绍,构建独立的 python 环境,快速发布你的应用

到此这篇关于手把手教你实现Python连接数据库并快速取数的工具的文章就介绍到这了。

300+Python经典编程案例

50G+学习视频教程

100+Python初阶、中阶、高阶电子书籍


点击拿去​​​​​

​​​​​​​



版权声明:本文为ai520wangzha原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。