方法如下:
整理校验的类型,不同的类型配置文件不一样。
1:校验数据增量:需要设置表名,增量字段。
2:非法值校验:设置表名,条件,校验字段,合法值/非法值范围。
3:自定义校验:设置表名,校验名称,自定义sql。
参数解析:
使用特殊字符作为参数的前缀,后缀;便于在脚本中进行检测和替换。
所实现的脚本如下:
配置文件:
dm_monitor_list.conf
1 record dm_box_office_summary index_date 2 record dm_channel_index index_date 3 record dm_comment_emotion_summary 4 record dm_comment_keyword_summary 5 record dm_comment_meterial dt 6 record dm_event_meterial index_date 7 record dm_event_meterial_comment dt 8 record dm_event_summary 9 record dm_index index_date 10 record dm_main_actor_index index_date 11 record dm_movie_comment_summary index_date 12 record dm_movie_wish_rating_summary dt 13 record dm_voice_meterial dt 14 record dm_index_date 15 record dm_comment_keyword_base 16 record dm_index_base 17 record dm_event_meterial_base 18 primary_check dm_box_office_summary select concat(movie_id,":",rating_type,":",count(1)) as row_val from dm_box_office_summary where index_date='_##dt##_' group by movie_id,rating_type having count(1) >1 19 primary_check dm_channel_index select concat(movie_id,":",count(1)) as row_val from dm_channel_index where datediff('_##dt##_',index_date)=1 group by movie_id having count(1)>1 20 primary_cyeck dm_box_office_summary select concat(movie_id,":",index_date,":",value) as row_val from dm_box_office_summary where index_date='_##dt##_' and value<=0 21 primary_check dm_channel_index select concat(movie_id,":",count(1)) as row_val from dm_channel_index where datediff('_##dt##_',index_date)=1 group by movie_id having count(1)>1 22 primary_check dm_comment_emotion_summary select concat(movie_id,":",mood_type,":",platform_id,":",channel_id,":",index_date,":",count(1)) as row_val from dm_comment_emotion_summary group by movie_id,mood_type,platform_id,channel_id,index_date having count(1)>1 23 primary_check dm_comment_keyword_summary select concat(movie_id,":",mood_type,":",keyword,":",platform_id,":",channel_id,":",index_date,":",count(1)) as row_val from dm_comment_keyword_summary group by movie_id,mood_type,keyword,platform_id,channel_id,index_date having count(1)>1 24 primary_check dm_comment_meterial select concat(comment_id,":",count(1)) as row_val from dm_comment_meterial where dt="_##dt##_" group by comment_id having count(1)>1 25 primary_check dm_event_meterial select concat(material_url,":",count(1)) as row_val from dm_event_meterial where index_date='_##dt##_' and index_type=1 group by material_url having count(1)>1 26 primary_check dm_event_meterial_comment select concat(comment_id,":",count(1)) as row_val from dm_event_meterial_comment where dt='_##dt##_' group by comment_id having count(1)>1 27 primary_check dm_event_summary select concat(event_id,":",platform_id,":",channel_id,":",index_date,":",count(1)) as row_val from dm_event_summary group by event_id,platform_id,channel_id,index_date having count(1)>1
脚本文件:monitor.sh
1 #!/sh/bash 2 # 分析表数据量状态 3 # 1.数据的唯一性 4 # 电影id唯一 5 # 2.指标的正确行 6 # 增量不能小于 0 ;全量表小 7 # 3.基本状态 8 ## 运算的条数、电影数量、空间 9 # 日志格式为 10 ## tablename dt check_type value insert_date 11 ## check_type : 12 ### record 记录值; 13 ### movie_num :电影数量 ; 14 ### space :所占空间 15 ### diff: 昨天和今天的电影差异,使用01 代表今天有昨天没有 10 代表昨天有今天没有 16 ### movie_rep:重复的电影数量 17 ### index-* :代表某个指标增量的为负值 18 19 basepath=$(cd `dirname $0`;pwd); 20 cd $basepath 21 22 source /etc/profile 23 source ../../etc/env.ini 24 25 if [[ ! -f "$basepath/monitor_list.conf" ]]; then 26 echo "check monitor list file not exists. system exit." 27 exit 4 28 fi 29 #config 30 #分区 31 dt=$(date -d "-1 day" "+%Y-%m-%d") 32 if [[ $# -eq 1 ]]; then 33 dt=$(date -d "$1" "+%Y-%m-%d") 34 fi 35 insert_date=$(date "+%Y-%m-%d") 36 file_path=$OPERATION_LOG_FILE_PATH 37 log_name=monitor_data.log 38 log=${file_path}/${log_name} 39 40 cat $basepath/monitor_list.conf | while read line 41 do 42 check_type=`echo $line | cut -d " " -f 1` 43 table_name=`echo $line |cut -d " " -f 2` 44 profix=$table_name"\t"$insert_date"\t" 45 46 if [[ $check_type == 'dw' ]];then 47 DB=$HIVE_DB_DW 48 hdfs_path=$HADOOP_DW_DATA_DESC 49 elif [[ $check_type == 'ods' ]];then 50 DB=$HIVE_DB_ODS_S 51 hdfs_path=$HADOOP_ODS_S_DATA_DESC 52 fi 53 54 #record 55 record=$(spark-sql -e "select count(1) from $DB.$table_name where dt = '$dt';") 56 echo -e $profix"record\t"$record >> $log 57 58 #movie_num 59 if [[ $table_name == 'dw_weibo_materials' ]];then 60 mtime_id="movie_id" 61 elif [[ $table_name == 'g4_weibo_materiel_post' ]];then 62 mtime_id='x_movie_id' 63 else 64 mtime_id="mtime_id" 65 fi 66 if [[ $table_name == 'dw_weibo_user' ]];then 67 movie_num=$(hive -e "select count(1) from 68 (select mtime_actor_id from $DB.$table_name where dt = '$dt' and source = 'govwb' group by mtime_actor_id) a") 69 else 70 movie_num=$(spark-sql -e "select count(1) from (select $mtime_id from $DB.$table_name where dt = '$dt' group by $mtime_id) a") 71 fi 72 echo -e $profix"movie_num\t"$movie_num >> $log 73 74 #space 75 if [[ $check_type == 'ods' ]];then 76 space=$(hadoop fs -du $hdfs_path/$table_name/$dt) 77 else 78 space=$(hadoop fs -du $hdfs_path/$table_name/dt=$dt) 79 fi 80 echo -e $profix"space\t"$space>> $log 81 82 #diff 83 if [[ $table_name != 'dw_weibo_user' ]];then 84 yesterday=$(date -d "-1 day $dt" "+%Y-%m-%d") 85 diff=$(spark-sql -e " 86 select concat_ws('|',collect_set(flag)) from ( 87 select 'gf' as gf, concat_ws('=',flag,cast(count(1) as string)) as flag from ( 88 select concat(if(y.$mtime_id is null, 0, 1),if(t.$mtime_id is null,0,1)) as flag 89 from (select distinct $mtime_id from $DB.$table_name where dt='$dt') t 90 full outer join (select distinct $mtime_id from $DB.$table_name where dt='$yesterday') y 91 on t.$mtime_id = y.$mtime_id 92 ) a group by flag 93 ) b group by gf;") 94 echo -e $profix"diff\t"$diff>> $log 95 fi 96 #movie_rep 97 if [[ $check_type == 'dw' ]];then 98 movie_rep=$(spark-sql -e " 99 select concat_ws('|',collect_set(v)) from ( 100 select 'k' as k ,concat_ws('=',id,cast(count(1) as string)) as v 101 from $DB.$table_name where dt = '$dt' 102 group by id 103 having count(1)>1 104 )a group by k;") 105 echo -e $profix"movie_rep\t"$movie_rep>> $log 106 fi 107 #index-* 108 if [[ $table_name == 'dw_comment_statistics' ]];then 109 up_day=$(spark-sql -e "select concat('<0:',count(1)) from $DB.$table_name 110 where dt = '$dt' and 111 (cast(up_day as int) < 0 112 or cast(down_day as int) < 0 113 or cast(vv_day as int ) < 0 114 or cast(cmts_day as int) < 0 115 );") 116 echo -e $profix"index_day\t"$up_day >> $log 117 fi 118 done 119 120 #dm 121 args_prefix="_##" 122 args_suffix="##_" 123 cat $basepath/dm_monitor_list.conf | while read line 124 do 125 check_type=`echo $line | cut -d " " -f 1` 126 table_name=`echo $line |cut -d " " -f 2` 127 echo "表"$table_name 128 if [[ $check_type == 'record' ]]; then 129 dt_str=`echo $line |cut -d " " -f 3` 130 echo "记录数校验 分区字段"$dt_str 131 else 132 custom_sql=`echo $line |cut -d " " -f 3-` 133 echo "自定义校验"$check_type 134 fi 135 136 137 profix=$table_name"\t"$insert_date"\t" 138 139 DB=$HIVE_DB_DW 140 hdfs_path=$HADOOP_DW_DATA_DESC 141 142 if [[ $check_type == 'record' ]]; then 143 record_sql="select count(1) from $DB.$table_name" 144 if [[ -n $dt_str ]]; then 145 # if [[ $table_name == 'dm_channel_index' ]]; then 146 # record_sql=$record_sql" where datediff('$dt',$dt_str)=1;" 147 # else 148 # record_sql=$record_sql" where $dt_str = '$dt';" 149 # fi 150 record_sql=$record_sql" where $dt_str = '$dt';" 151 else 152 record_sql=$record_sql";" 153 fi 154 155 echo "执行的语句:"$record_sql 156 #record 157 record=$(hive -e "set hive.mapred.mode = nonstrict;$record_sql") 158 #record=$(spark-sql -e "$record_sql") 159 echo -e $profix"$check_type\t"$record >> $log 160 else 161 #custom_sql 162 custom_sql=${custom_sql//$args_prefix"dt"$args_suffix/$dt} 163 echo "执行的语句:"$custom_sql 164 invalid_records=$(hive -e "set hive.mapred.mode = nonstrict;use $DB;select concat_ws(\" | \",collect_set(row_val)) from ( $custom_sql ) tmp;") 165 echo $invalid_records 166 if [[ ! -n $invalid_records || $invalid_records == '' ]]; then 167 invalid_records="0" 168 fi 169 echo -e $profix"$check_type\t"$invalid_records >> $log 170 fi 171 done 172 # insert hive 173 hadoop fs -rm -r $HADOOP_ODS_CONFIG_DATA_DESC/yq_monitor_data_log/dt=$dt 174 175 if [ -f "${file_path}/$log_name" ]; then 176 hive -e " 177 ALTER TABLE $HIVE_DB_MONITOR.yq_monitor_data_log DROP IF EXISTS PARTITION (dt = '$dt'); 178 alter table $HIVE_DB_MONITOR.yq_monitor_data_log add partition (dt = '$dt'); 179 " 180 fi 181 cd $file_path 182 hadoop fs -put $log_name $HADOOP_ODS_CONFIG_DATA_DESC/yq_monitor_data_log/dt=$dt 183 mv -f $log_name /home/trash
转载于:https://www.cnblogs.com/keensword/p/9358887.html