2021年大学生大数据技能竞赛上海分区HIVE篇解析

  • Post author:
  • Post category:其他




项目需求

自2019年底,湖北省武汉市监测发现不明原因肺炎病例,中国第一时间报告疫情,迅速采取行动,开展病因学和流行病学调查,阻断疫情蔓延。

SARS-CoV-2是一种有着高扩散能力的病毒,通过飞沫、直接接触和被感染的物体传播,其潜伏时间为1到14天,并且也由无症状感染者传播。大多数感染者仅表现出轻度至中度的呼吸道症状,或根本不表现任何症状。只有5-10%的感染者显示出完全的严重呼吸综合征,称为冠状病毒病(COVID)-19,能够人传人,进而所引发的全球大流行疫情,是全球自第二次世界大战以来面临的最严峻危机。截至目前,全球已有200多个国家和地区累计报告超过2.1375亿确诊病例,导致超过445万名患者死亡。

在此基础上,大数据技术应用发挥出极大作用。通过城市监测,接触者追踪,疫苗接种等,将我们的疫情信息进行传达。为政府正确决策、精准施策提供了科学依据。强化了政府对疫情物资生产、筹集、投放的科学管控手段。为医疗救治、“群防群控”,防止疫情蔓延采取有效措施提供了科学数据和手段。科学分析预测疫情现状、趋势,适时准确地根据疫情变化把握防疫重点。


本项目为新型冠状病毒(COVID-19)疫情状况的时间序列数据仓库,选手可以通过对疫情历史数据的分析研究,以更好的了解疫情与疫情的发展态势,为抗击疫情之决策提供数据支持。



step1:创建ods层数据表

ods层是数据原始层,只需将原始数据拉去过来即可,ods层可以采用内部表(EXTERNAL 修饰)保证数据安全性。


数据库:covid_ods

原始数据表:covid_ods.covid


字段

解释
continerName 大洲
countryName 国家
provinceName 省份
province_confirm 省份确诊人数
province_suspect 省份感染人数
province_cured 省份治愈人数
province_dead 省份死亡人数
cityName 城市/地区
city_confirm 城市确诊人数
city_suspect 城市感染人数
city_cured 城市治愈人数
city_dead 城市死亡人数
updateTime 数据更新时间


数据加工表:covid_ods.covid_time

要求:保留干净数据,去重(去空值、脏数据处理),提取特征数据,只保留每天最后更新的数据;

\1. 特征数据:包括省份,城市/地区,城市确诊,城市感染,城市治愈,城市死亡,数据更新时间;

\2. 过滤重复值,数据中有同一天获取的多次疫情信息,根据时间只保留每天最后更新的数据;

\3. 同时要求国家为中国,省份不为中国,过滤地区空值。


字段

解释
provinceName 省份
cityName 城市/地区
city_confirm 城市确诊人数
city_suspect 城市感染人数
city_cured 城市治愈人数
city_dead 城市死亡人数
updateTime 数据更新时间
# 重复数据,同一天多次更新的数据,取每天最后更新的数据(按照时间进行条件判断)
亚洲,中国,福建省,1169,15,777,1,莆田,246,0,56,0,2021-09-20 19:14:20
亚洲,中国,福建省,1169,15,777,1,莆田,246,0,56,0,2021-09-20 18:54:20
亚洲,中国,福建省,1169,15,777,1,莆田,246,0,56,0,2021-09-20 09:50:20
 
# 地区空值数据
亚洲,中国,台湾,40,0,12,1,,,,,,2020-03-02 09:10:02
亚洲,中国,香港,10,0,0,0,,,,,,2020-01-29 19:12:29



sed` `-n ``'1,10'``p ``/etc/test1` `>>test2
#将file1的1-10行追加到file2



step2:创建dwd层数据表

在dwd层采用分区表将数据按照年/月维度进行分区存放,以便在获取某月数据时可快速获取,提高获取效率。在本层将获取想要的字段数据**,**对不规则数据做简单整理。


数据库:covid_dwd


添加昨天时间列表:province


指标

:城市累计确诊,城市累计疑似,城市累计治愈,城市累计死亡。


维度

:省份,城市,时间(更新时间updateTime,以及时间的前一天yesterday)。


分区

:年、月


思路

:将数据中的时间切割,获取年月日,并使用date_sub()函数获取昨天更新时间。


字段

解释
provinceName 省份
cityName 城市/地区
city_confirm 城市确诊人数
city_suspect 城市感染人数
city_cured 城市治愈人数
city_dead 城市死亡人数
updateTime 数据更新时间
yesterday 昨天更新时间
yearinfo 年(分区)
monthinfo 月(分区)



step3:创建dwm数据处理分析

统计每天各个省份中指标的增长量,因此需要去获取前一天或者后一天的数据,在本层将当天数据和前一天的数据进行汇总,通过join方式将数据合并为一条数据。对四个指标数据进行类型转换,转换为int类型(在dws层将参与运算)。


数据库:covid_dwd


创建当日数据和后一天数据汇总数据表:two_day


字段

解释
provinceName 省份
cityName 城市/地区
city_confirm 城市确诊人数
city_suspect 城市感染人数
city_cured 城市治愈人数
city_dead 城市死亡人数
updateTime 更新时间
city_confirm_before 一天前城市确诊人数
city_suspect_before 一天前城市感染人数
city_cured_before 一天前城市治愈人数
city_dead_before 一天前城市死亡人数
yesterday 昨天更新时间
yearinfo 年(分区)
monthinfo 月(分区)

合并数据注意考虑时间问题。



step4:创建dws层

在dwd层已经拿到前一天的数据,在本层计算各个地区的指标增量,计算方式为:


每日指标增量=前一天指标数据-今日指标数据


数据库:covid_dws


单日指标正常量表:covid_dws.day


字段

解释
provinceName 省份
cityName 城市/地区
new_city_confirm 日确诊增长人数
new_city_suspect 日疑似增长人数
new_city_cured 日治愈增长人数
new_city_dead 日死亡增长人数
updateTime 更新时间
yearinfo 年(分区)
monthinfo 月(分区)



step5:创建app层

针对疫情数据,在app层再对维度进行上卷,分析维度为各个省份每日的指标增量情况统计。


数据库:covid_app


app层业务表:covid_app.day_app


字段

解释
provinceName 省份
new_city_confirm 日确诊增长人数
new_city_suspect 日疑似增长人数
new_city_cured 日治愈增长人数
new_city_dead 日死亡增长人数
updateTime 更新时间
yearinfo 年(分区)
monthinfo 月(分区)



其他参考设置:

--动态分区配置set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=100000;
set hive.exec.max.dynamic.partitions=100000;set hive.exec.max.created.files=100000;
--hive压缩set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
--写入时压缩生效set hive.exec.orc.compression.strategy=COMPRESSION;
--本地模式set hive.exec.mode.local.auto=true;set mapreduce.map.memory.mb=1025; 
set mapreduce.reduce.memory.mb=1025;
set hive.exec.mode.local.auto.input.files.max=25;



题目



前置准备

修改主机名

hostnamectl set-hostname master

格式化HDFS,开启集群

cd /root/machine/hadoop-2.7.7

hadoop namenode -format

sbin/start-all.sh

开启mysql服务

systemctl start mysqld

环境中已经安装Hive2.3.4,需要开启mysql服务,初始化数据库,即可开启Hive客户端。

cd /usr/hive/apache-hive-2.3.4-bin

schematool -dbType mysql -initSchema



covid_ods库

进入hive客户端,创建名为covid_ods的数据库用于存储原始数据

create database covid_ods;

use covid_ods;


covid表

数据库covid_ods下创建covid表,导入去除表头后的原始数据/root/covid/covid_area.csv(文件名不变)

create table covid(
continerName STRING,
countryName STRING,
provinceName STRING,
province_confirm INT,
province_suspect INT,
province_cured INT,
province_dead INT,
cityName STRING,
city_confirm INT,
city_suspect INT,
city_cured INT,
city_dead INT,
updateTime TIMESTAMP
)row format delimited fields terminated by ',';

导入

load data local inpath '/root/covid/covid_area.csv' into table covid;


covid_time表

用于提取有用数据,过滤重复值,只保留每天最后更新的数据,具体参考步骤说明

创建表

create table covid_time(
provinceName STRING,
cityName STRING,
city_confirm INT,
city_suspect INT,
city_cured INT,
city_dead INT,
updateTime TIMESTAMP
)row format delimited fields terminated by ',';

按照要求向covid_ods.covid_time插入过滤后的数据

insert overwrite table covid_time
select
provinceName,
cityName,
city_confirm,
city_suspect,
city_cured,
city_dead,
updateTime
from (
select *
from (
select 
row_number() over(partition by cityName,day order by minute desc) as rmp,*
from (
select 
*,
date_format(regexp_replace(updateTime,'/','-'),'yyyy-MM-dd') as day,
date_format(regexp_replace(updateTime,'/','-'),'yyyy-MM-dd-HH-mm') as minute
from covid 
where  countryName ="中国" and provinceName !="中国" and cityName !=""
) a
) b
where rmp=1
) c
;


结果

重庆市	万州区	118	0	114	4	2021-08-19 09:52:19
重庆市	万州区	118	0	114	4	2021-08-23 10:36:23
重庆市	万州区	118	0	114	4	2021-09-03 19:55:03
重庆市	万盛经开区	1	0	1	0	2020-04-24 08:43:24
重庆市	万盛经开区	1	0	1	0	2020-06-14 09:10:14
重庆市	万盛经开区	1	0	1	0	2020-06-15 09:08:15
重庆市	万盛经开区	1	0	1	0	2020-07-03 09:40:03
重庆市	万盛经开区	1	0	1	0	2020-07-09 08:14:09



covid_dwd库

创建名为covid_dwd的数据库,此层将数据进行分区,便于数据的快速获取

create database covid_dwd;
use covid_dwd;


province表

数据库covid_dwd下创建province表,按照年、月分区,要求根据当天时间获取昨天对应时间列,并插入对应数据,具体要求查看步骤说明

create table province(
provinceName STRING,
cityName STRING,
city_confirm INT,
city_suspect INT,
city_cured INT,
city_dead INT,
updateTime TIMESTAMP,
yesterday TIMESTAMP)
partitioned by(yearinfo int,monthinfo int)
row format delimited fields terminated by ',';

两种的区别在于部分数据找不到前一天的日期,第一种是将查不到前一天日期设置为NULL,第二种是直接跳过此类数据

## 第一种可能
set hive.exec.dynamici.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.mode.local.auto=true;
SET hive.exec.max.dynamic.partitions=100000;
SET hive.exec.max.dynamic.partitions.pernode=100000;
set hive.exec.max.created.files=100000;
INSERT OVERWRITE TABLE covid_dwd.province partition(yearinfo, monthinfo)
select a.provinceName,a.cityName,a.city_confirm,a.city_suspect,
a.city_cured,a.city_dead,a.updateTime,b.updateTime,year(a.updateTime) as yearinfo,month(a.updateTime) as monthinfo
from
(select
provinceName,cityName,city_confirm,city_suspect,city_cured,city_dead,updateTime,
year(updateTime) as yearinfo,
month(updateTime) as monthinfo,date_sub(updateTime,1) as yes 
from covid_ods.covid_time t1
) a
left join 
(select cityName,updateTime,date_format(regexp_replace(updateTime,'/','-'),'yyyy-MM-dd') as day 
from covid_ods.covid_time t2
) b 
on b.day= a.yes and b.cityName=a.cityName;


## 第二种可能
set hive.exec.dynamici.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.mode.local.auto=true;
SET hive.exec.max.dynamic.partitions=100000;
SET hive.exec.max.dynamic.partitions.pernode=100000;
set hive.exec.max.created.files=100000;
INSERT OVERWRITE TABLE covid_dwd.province partition(yearinfo, monthinfo)
select a.provinceName,a.cityName,a.city_confirm,a.city_suspect,
a.city_cured,a.city_dead,a.updateTime,b.updateTime 
from
(select
provinceName,cityName,city_confirm,city_suspect,city_cured,city_dead,updateTime,
year(updateTime) as yearinfo,
month(updateTime) as monthinfo,date_sub(updateTime,1) as yes 
from covid_ods.covid_time t1
) a
join 
(select cityName,updateTime,date_format(regexp_replace(updateTime,'/','-'),'yyyy-MM-dd') as day 
from covid_ods.covid_time t2
) b 
on b.day= a.yes and b.cityName=a.cityName;


结果

黑龙江省	七台河	17	0	17	0	2020-10-27 08:42:27	NULL	2020	10
重庆市	万州区	118	0	114	4	2020-10-02 08:49:02	NULL	2020	10
重庆市	万州区	118	0	114	4	2020-10-16 09:26:16	NULL	2020	10
重庆市	万州区	118	0	114	4	2020-10-20 08:45:20	NULL	2020	10
重庆市	万州区	118	0	114	4	2020-10-21 08:44:21	2020-10-20 08:45:20	2020	10
重庆市	万州区	118	0	114	4	2020-10-23 08:20:23	NULL	2020	10
重庆市	万盛经开区	1	0	1	0	2020-10-02 08:49:02	NULL	2020	10
重庆市	万盛经开区	1	0	1	0	2020-10-16 09:26:16	NULL	2020	10
重庆市	万盛经开区	1	0	1	0	2020-10-20 08:45:20	NULL	2020	10
重庆市	万盛经开区	1	0	1	0	2020-10-21 08:44:21	2020-10-20 08:45:20	2020	10



covid_dwm库

创建名为covid_dwm的数据库,用于统计每个省份的各指标增长量。

create database covid_dwm;
use covid_dwm;


two_day表

数据库covid_dwm下创建two_day表

create table two_day(
provinceName STRING,
cityName STRING,
city_confirm INT,
city_suspect INT,
city_cured INT,
city_dead INT,
updateTime TIMESTAMP,
city_confirm_before INT,
city_suspect_before INT,
city_cured_before INT,
city_dead_before INT,
yesterday TIMESTAMP)
partitioned by(yearinfo int,monthinfo int)
row format delimited fields terminated by ',';

将province中当天数据和前一天的数据进行汇总,通过join方式将数据合并为一条数据,具体查看步骤说明

set hive.exec.dynamici.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.mode.local.auto=true;
SET hive.exec.max.dynamic.partitions=100000;
SET hive.exec.max.dynamic.partitions.pernode=100000;
set hive.exec.max.created.files=100000;
INSERT OVERWRITE TABLE covid_dwm.two_day partition(yearinfo, monthinfo)
select 
a.provinceName,a.cityName,a.city_confirm,a.city_suspect,
a.city_cured,a.city_dead,a.updateTime,
b.city_confirm as city_confirm_before,b.city_suspect as city_suspect_before,
b.city_cured as city_cured_before,b.city_dead as city_dead_before,a.yesterday,
a.yearinfo,a.monthinfo from covid_dwd.province as a 
left join covid_dwd.province as b on a.yesterday=b.updateTime and a.cityName=b.cityName and a.provinceName=b.provinceName;

将表two_day中所有内容保存至云主机/root/covid/two_day.csv

hive -e 'select 
a.provinceName,a.cityName,a.city_confirm,a.city_suspect,
a.city_cured,a.city_dead,a.updateTime,
b.city_confirm as city_confirm_before,b.city_suspect as city_suspect_before,
b.city_cured as city_cured_before,b.city_dead as city_dead_before,a.yesterday,
a.yearinfo,a.monthinfo from covid_dwd.province as a 
left join covid_dwd.province as b on a.yesterday=b.updateTime and a.cityName=b.cityName and a.provinceName=b.provinceName;' | sed 's/[[:space:]]\+/,/g' > /root/covid/two_day.csv 


结果

黑龙江省	七台河	17	0	17	0	2020-10-27 08:42:27	NULL	NULL	NULL	NULL	NULL	2020	10
重庆市	万州区	118	0	114	4	2020-10-02 08:49:02	NULL	NULL	NULL	NULL	NULL	2020	10
重庆市	万州区	118	0	114	4	2020-10-16 09:26:16	NULL	NULL	NULL	NULL	NULL	2020	10
重庆市	万州区	118	0	114	4	2020-10-20 08:45:20	NULL	NULL	NULL	NULL	NULL	2020	10
重庆市	万州区	118	0	114	4	2020-10-21 08:44:21	118	0	114	4	2020-10-20 08:45:20	2020	10
重庆市	万州区	118	0	114	4	2020-10-23 08:20:23	NULL	NULL	NULL	NULL	NULL	2020	10
重庆市	万盛经开区	1	0	1	0	2020-10-02 08:49:02	NULL	NULL	NULL	NULL	NULL	2020	10
重庆市	万盛经开区	1	0	1	0	2020-10-16 09:26:16	NULL	NULL	NULL	NULL	NULL	2020	10
重庆市	万盛经开区	1	0	1	0	2020-10-20 08:45:20	NULL	NULL	NULL	NULL	NULL	2020	10
重庆市	万盛经开区	1	0	1	0	2020-10-21 08:44:21	1	0	1	0	2020-10-20 08:45:20	202010



covid_dws库

创建名为covid_dws的数据库,用于计算各个地区的指标增量。

create database covid_dws;
use covid_dws;


day表

数据库covid_dws下创建day表

create table day(
provinceName STRING,
cityName STRING,
new_city_confirm INT,
new_city_suspect INT,
new_city_cured INT,
new_city_dead INT,
updateTime TIMESTAMP)
partitioned by(yearinfo int,monthinfo int)
row format delimited fields terminated by ',';

计算地区每日指标增量,具体字段查看步骤说明

set hive.exec.dynamici.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.mode.local.auto=true;
SET hive.exec.max.dynamic.partitions=100000;
SET hive.exec.max.dynamic.partitions.pernode=100000;
set hive.exec.max.created.files=100000;
INSERT OVERWRITE TABLE covid_dws.day partition(yearinfo, monthinfo)
select 
provinceName,cityName,
(city_confirm-city_confirm_before) as new_city_confirm,
(city_suspect-city_suspect_before) as new_city_suspect,
(city_cured-city_cured_before) as new_city_cured,
(city_dead-city_dead_before) as new_city_dead,
a.updateTime,yearinfo,monthinfo from covid_dwm.two_day;

将表day中所有内容保存至云主机/root/covid/day.csv

hive -e '
select 
provinceName,cityName,
(city_confirm-city_confirm_before) as new_city_confirm,
(city_suspect-city_suspect_before) as new_city_suspect,
(city_cured-city_cured_before) as new_city_cured,
(city_dead-city_dead_before) as new_city_dead,
a.updateTime,yearinfo,monthinfo from covid_dwm.two_day;
' | sed 's/[[:space:]]\+/,/g' > /root/covid/day.csv 


结果

黑龙江省	七台河	NULL	NULL	NULL	NULL	2020-10-27 08:42:27	2020	10
重庆市	万州区	NULL	NULL	NULL	NULL	2020-10-02 08:49:02	2020	10
重庆市	万州区	NULL	NULL	NULL	NULL	2020-10-16 09:26:16	2020	10
重庆市	万州区	NULL	NULL	NULL	NULL	2020-10-20 08:45:20	2020	10
重庆市	万州区	0	0	0	0	2020-10-21 08:44:21	2020	10
重庆市	万州区	NULL	NULL	NULL	NULL	2020-10-23 08:20:23	2020	10
重庆市	万盛经开区	NULL	NULL	NULL	NULL	2020-10-02 08:49:02	2020	10
重庆市	万盛经开区	NULL	NULL	NULL	NULL	2020-10-16 09:26:16	2020	10
重庆市	万盛经开区	NULL	NULL	NULL	NULL	2020-10-20 08:45:20	2020	10
重庆市	万盛经开区	0	0	0	0	2020-10-21 08:44:21	2020	10



covid_app库

创建名为covid_app的数据库,此层用于各个省份每日的指标增量情况统计

create database covid_app;
use covid_app;


day_app表

数据库covid_app下创建app层业务表,进行各个省份每日的指标增量情况统计

create table day_app(
provinceName STRING,
new_city_confirm INT,
new_city_suspect INT,
new_city_cured INT,
new_city_dead INT,
updateTime TIMESTAMP)
partitioned by(yearinfo int,monthinfo int)
row format delimited fields terminated by ',';

将表day_app中所有内容保存至云主机/root/covid/day_app.csv

set hive.exec.dynamici.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.mode.local.auto=true;
SET hive.exec.max.dynamic.partitions=100000;
SET hive.exec.max.dynamic.partitions.pernode=100000;
set hive.exec.max.created.files=100000;
INSERT OVERWRITE TABLE covid_app.day_app partition(yearinfo, monthinfo)
select distinct provinceName,new_city_confirm1,new_city_suspect1,new_city_cured1,new_city_dead1 ,updateTime,yearinfo,monthinfo from
(select provinceName,cityName,new_city_confirm,new_city_suspect,new_city_cured,new_city_dead,
sum(if(new_city_confirm is not null, new_city_confirm, 0)) over(partition by provinceName,updateTime) as new_city_confirm1,
sum(if(new_city_suspect is not null, new_city_suspect, 0)) over(partition by provinceName,updateTime ) as new_city_suspect1,
sum(if(new_city_cured is not null, new_city_cured, 0)) over(partition by provinceName,updateTime) as new_city_cured1,
sum(if(new_city_dead is not null, new_city_dead,0)) over(partition by provinceName,updateTime) as new_city_dead1,
updateTime,yearinfo,monthinfo from covid_dws.day
) t1;
hive -e '
select 
* from covid_app.day_app;
' | sed 's/[[:space:]]\+/,/g' > /root/covid/day_app.csv

结果(多数数据显示为0,这里选取了中间连续有新增的数据)

福建省	21	0	1	0	2021-09-12 18:58:12	2021	9
福建省	22	0	1	0	2021-09-13 19:10:13	2021	9
福建省	29	0	0	0	2021-09-20 19:14:20	2021	9
福建省	31	0	1	0	2021-09-18 18:12:18	2021	9
福建省	43	0	0	0	2021-09-19 10:02:19	2021	9
福建省	48	0	2	0	2021-09-16 09:58:16	2021	9
福建省	51	0	1	0	2021-09-15 18:29:15	2021	9
福建省	60	0	2	0	2021-09-14 18:31:14	2021	9
福建省	61	0	1	0	2021-09-17 19:46:17	2021	9



版权声明:本文为weixin_47726676原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。