使用shell进行etl数据验证

  • Post author:
  • Post category:其他


方法如下:

整理校验的类型,不同的类型配置文件不一样。

1:校验数据增量:需要设置表名,增量字段。

2:非法值校验:设置表名,条件,校验字段,合法值/非法值范围。

3:自定义校验:设置表名,校验名称,自定义sql。

参数解析:

使用特殊字符作为参数的前缀,后缀;便于在脚本中进行检测和替换。

所实现的脚本如下:

配置文件:

dm_monitor_list.conf

 1 record dm_box_office_summary index_date
 2 record dm_channel_index index_date
 3 record dm_comment_emotion_summary
 4 record dm_comment_keyword_summary
 5 record dm_comment_meterial dt
 6 record dm_event_meterial index_date
 7 record dm_event_meterial_comment dt
 8 record dm_event_summary
 9 record dm_index index_date
10 record dm_main_actor_index index_date
11 record dm_movie_comment_summary index_date
12 record dm_movie_wish_rating_summary dt
13 record dm_voice_meterial dt
14 record dm_index_date
15 record dm_comment_keyword_base
16 record dm_index_base
17 record dm_event_meterial_base
18 primary_check dm_box_office_summary select concat(movie_id,":",rating_type,":",count(1)) as row_val from dm_box_office_summary where index_date='_##dt##_' group by movie_id,rating_type having count(1) >1
19 primary_check dm_channel_index select concat(movie_id,":",count(1)) as row_val from dm_channel_index where datediff('_##dt##_',index_date)=1 group by movie_id having count(1)>1
20 primary_cyeck dm_box_office_summary select concat(movie_id,":",index_date,":",value) as row_val from dm_box_office_summary  where index_date='_##dt##_' and value<=0
21 primary_check dm_channel_index select concat(movie_id,":",count(1)) as row_val from dm_channel_index where datediff('_##dt##_',index_date)=1 group by movie_id having count(1)>1
22 primary_check dm_comment_emotion_summary select concat(movie_id,":",mood_type,":",platform_id,":",channel_id,":",index_date,":",count(1)) as row_val from dm_comment_emotion_summary group by movie_id,mood_type,platform_id,channel_id,index_date having count(1)>1
23 primary_check dm_comment_keyword_summary select concat(movie_id,":",mood_type,":",keyword,":",platform_id,":",channel_id,":",index_date,":",count(1)) as row_val from dm_comment_keyword_summary group by movie_id,mood_type,keyword,platform_id,channel_id,index_date having count(1)>1
24 primary_check dm_comment_meterial select concat(comment_id,":",count(1)) as row_val from dm_comment_meterial where dt="_##dt##_" group by comment_id having count(1)>1
25 primary_check dm_event_meterial select concat(material_url,":",count(1)) as row_val from dm_event_meterial where index_date='_##dt##_' and index_type=1 group by material_url having count(1)>1
26 primary_check dm_event_meterial_comment select concat(comment_id,":",count(1)) as row_val from dm_event_meterial_comment where dt='_##dt##_' group by comment_id having count(1)>1
27 primary_check dm_event_summary select concat(event_id,":",platform_id,":",channel_id,":",index_date,":",count(1)) as row_val from dm_event_summary group by event_id,platform_id,channel_id,index_date having count(1)>1

脚本文件:monitor.sh

  1 #!/sh/bash
  2 # 分析表数据量状态
  3 # 1.数据的唯一性
  4 #   电影id唯一
  5 # 2.指标的正确行
  6 #   增量不能小于 0 ;全量表小
  7 # 3.基本状态
  8 ##  运算的条数、电影数量、空间
  9 # 日志格式为
 10 ## tablename    dt  check_type  value   insert_date
 11 ## check_type :
 12 ### record 记录值;
 13 ### movie_num :电影数量 ;
 14 ### space :所占空间
 15 ### diff: 昨天和今天的电影差异,使用01 代表今天有昨天没有 10 代表昨天有今天没有
 16 ### movie_rep:重复的电影数量
 17 ### index-* :代表某个指标增量的为负值
 18 
 19 basepath=$(cd `dirname $0`;pwd);
 20 cd $basepath
 21 
 22 source /etc/profile
 23 source ../../etc/env.ini
 24 
 25 if [[ ! -f "$basepath/monitor_list.conf" ]]; then
 26     echo "check monitor list file not exists. system exit."
 27     exit 4
 28 fi
 29 #config
 30 #分区
 31 dt=$(date -d "-1 day" "+%Y-%m-%d")
 32 if [[ $# -eq 1 ]]; then
 33     dt=$(date -d "$1" "+%Y-%m-%d")
 34 fi
 35 insert_date=$(date "+%Y-%m-%d")
 36 file_path=$OPERATION_LOG_FILE_PATH
 37 log_name=monitor_data.log
 38 log=${file_path}/${log_name}
 39 
 40 cat $basepath/monitor_list.conf | while read line
 41 do
 42     check_type=`echo $line | cut -d " " -f 1`
 43     table_name=`echo $line |cut -d " " -f 2`
 44     profix=$table_name"\t"$insert_date"\t"
 45 
 46     if [[ $check_type == 'dw' ]];then
 47         DB=$HIVE_DB_DW
 48         hdfs_path=$HADOOP_DW_DATA_DESC
 49     elif [[ $check_type == 'ods' ]];then
 50         DB=$HIVE_DB_ODS_S
 51         hdfs_path=$HADOOP_ODS_S_DATA_DESC
 52     fi
 53 
 54     #record
 55     record=$(spark-sql -e "select count(1) from $DB.$table_name where dt = '$dt';")
 56     echo -e $profix"record\t"$record >> $log
 57 
 58     #movie_num
 59     if [[ $table_name == 'dw_weibo_materials' ]];then
 60         mtime_id="movie_id"
 61     elif [[ $table_name == 'g4_weibo_materiel_post' ]];then
 62         mtime_id='x_movie_id'
 63     else
 64         mtime_id="mtime_id"
 65     fi
 66     if [[ $table_name == 'dw_weibo_user' ]];then
 67         movie_num=$(hive -e "select count(1) from
 68             (select mtime_actor_id from $DB.$table_name where dt = '$dt' and source = 'govwb' group by mtime_actor_id) a")
 69     else
 70         movie_num=$(spark-sql -e "select count(1) from (select $mtime_id from $DB.$table_name where dt = '$dt' group by $mtime_id) a")
 71     fi
 72     echo -e $profix"movie_num\t"$movie_num >> $log
 73 
 74     #space
 75     if [[ $check_type == 'ods' ]];then
 76         space=$(hadoop fs -du $hdfs_path/$table_name/$dt)
 77     else
 78         space=$(hadoop fs -du $hdfs_path/$table_name/dt=$dt)
 79     fi
 80     echo -e $profix"space\t"$space>> $log
 81 
 82     #diff
 83     if [[ $table_name != 'dw_weibo_user' ]];then
 84         yesterday=$(date -d "-1 day $dt" "+%Y-%m-%d")
 85         diff=$(spark-sql -e "
 86             select concat_ws('|',collect_set(flag)) from (
 87                 select 'gf' as gf, concat_ws('=',flag,cast(count(1) as string)) as flag  from (
 88                     select concat(if(y.$mtime_id is null, 0, 1),if(t.$mtime_id is null,0,1)) as flag
 89                     from (select distinct $mtime_id from $DB.$table_name where dt='$dt') t
 90                     full outer join (select distinct $mtime_id from $DB.$table_name where dt='$yesterday') y
 91                     on  t.$mtime_id = y.$mtime_id
 92                 ) a group by flag
 93             ) b group by gf;")
 94         echo -e $profix"diff\t"$diff>> $log
 95     fi
 96     #movie_rep
 97     if [[ $check_type == 'dw' ]];then
 98         movie_rep=$(spark-sql -e "
 99             select concat_ws('|',collect_set(v)) from (
100                 select 'k' as k ,concat_ws('=',id,cast(count(1) as string)) as v
101                 from $DB.$table_name where dt = '$dt'
102                 group by id
103                 having count(1)>1
104             )a  group by k;")
105         echo -e $profix"movie_rep\t"$movie_rep>> $log
106     fi
107     #index-*
108     if [[ $table_name == 'dw_comment_statistics' ]];then
109         up_day=$(spark-sql -e "select concat('<0:',count(1)) from $DB.$table_name
110             where dt = '$dt' and
111                 (cast(up_day as int) < 0
112                 or cast(down_day as int) < 0
113                 or cast(vv_day as int ) < 0
114                 or cast(cmts_day as int) < 0
115                 );")
116         echo -e $profix"index_day\t"$up_day >> $log
117     fi
118 done
119 
120 #dm
121 args_prefix="_##"
122 args_suffix="##_"
123 cat $basepath/dm_monitor_list.conf | while read line
124 do
125     check_type=`echo $line | cut -d " " -f 1`
126     table_name=`echo $line |cut -d " " -f 2`
127     echo ""$table_name
128     if [[ $check_type == 'record' ]]; then
129         dt_str=`echo $line |cut -d " " -f 3`
130         echo "记录数校验 分区字段"$dt_str
131     else
132         custom_sql=`echo $line |cut -d " " -f 3-`
133         echo "自定义校验"$check_type
134     fi
135 
136 
137     profix=$table_name"\t"$insert_date"\t"
138 
139     DB=$HIVE_DB_DW
140     hdfs_path=$HADOOP_DW_DATA_DESC
141 
142     if [[ $check_type == 'record' ]]; then
143         record_sql="select count(1) from $DB.$table_name"
144         if [[ -n $dt_str ]]; then
145             # if [[ $table_name == 'dm_channel_index' ]]; then
146                 #     record_sql=$record_sql" where datediff('$dt',$dt_str)=1;"
147             # else
148             #     record_sql=$record_sql" where $dt_str = '$dt';"
149             # fi
150             record_sql=$record_sql" where $dt_str = '$dt';"
151         else
152             record_sql=$record_sql";"
153         fi
154 
155         echo "执行的语句:"$record_sql
156         #record
157         record=$(hive -e "set hive.mapred.mode = nonstrict;$record_sql")
158         #record=$(spark-sql -e "$record_sql")
159         echo -e $profix"$check_type\t"$record >> $log
160     else
161         #custom_sql
162         custom_sql=${custom_sql//$args_prefix"dt"$args_suffix/$dt}
163         echo "执行的语句:"$custom_sql
164         invalid_records=$(hive -e "set hive.mapred.mode = nonstrict;use $DB;select concat_ws(\" | \",collect_set(row_val)) from ( $custom_sql ) tmp;")
165         echo $invalid_records
166         if [[ ! -n $invalid_records || $invalid_records == ''  ]]; then
167                 invalid_records="0"
168         fi
169         echo -e $profix"$check_type\t"$invalid_records >> $log
170     fi
171 done
172 # insert hive
173 hadoop fs -rm -r $HADOOP_ODS_CONFIG_DATA_DESC/yq_monitor_data_log/dt=$dt
174 
175 if [ -f "${file_path}/$log_name" ]; then
176     hive -e "
177     ALTER TABLE $HIVE_DB_MONITOR.yq_monitor_data_log DROP IF EXISTS PARTITION (dt = '$dt');
178     alter table $HIVE_DB_MONITOR.yq_monitor_data_log add partition (dt = '$dt');
179     "
180 fi
181 cd $file_path
182 hadoop fs -put $log_name $HADOOP_ODS_CONFIG_DATA_DESC/yq_monitor_data_log/dt=$dt
183 mv -f $log_name /home/trash

转载于:https://www.cnblogs.com/keensword/p/9358887.html