为啥想到巧用group by 还是因为优秀的人 想法就是666
看到一篇文章
hint在我上一篇文章已经分析过了。
这里 大佬用了一个特别巧妙的方法,group by key ,ceil(rand()*100) 乍一看好像就明白了,但要你说好像又说不出个所以然。
瞬间想到distributed by ceil(rand()*10 将任务输出为10个文件
测试代码。
public static void main(String[] args) throws KuduException, InterruptedException {
SparkConf sparkConf = new SparkConf().setMaster("yarn").setAppName("ClusterSparkTestGroupBy");
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
sparkConf = new SparkConf().setMaster("local[*]").setAppName("dp");
sparkConf.set("spark.driver.memory", "1g").set("spark.testing.memory", "1073740000");
kerberos_auth();
}
SparkSession session = SparkSession.builder().config(sparkConf)
.enableHiveSupport().getOrCreate();
Dataset<Row> dataset = session.sql("select \n" +
"concat(a.src_supplier_id1,'----',b.src_supplier_id2) \n" +
"from (\n" +
"select biz_id,concat_ws(',',collect_list(src_supplier_id)) src_supplier_id1 from dwiadata.ia_fdw_hr_company_contact_info_relation group by biz_id\n" +
")a \n" +
"join (\n" +
"select biz_id,concat_ws(',',collect_list(src_supplier_id)) src_supplier_id2 from dwiadata.ia_fdw_hr_company_contact_info_relation group by biz_id\n" +
")b \n" +
"on a.biz_id =b.biz_id ");
System.out.println("dataset.partition="+dataset.javaRDD().getNumPartitions());
dataset.write().mode(SaveMode.Overwrite).text("file:///D:\\install\\code\\tencent\\dw_ia_portraitsearch\\output\\common");
Dataset<Row> dataset2 = session.sql("select \n" +
"concat(a.src_supplier_id1,'----',b.src_supplier_id2)\n" +
"from (\n" +
"select biz_id,concat_ws(',',collect_list(src_supplier_id)) src_supplier_id1 from dwiadata.ia_fdw_hr_company_contact_info_relation group by biz_id ,ceil(rand()*6)\n" +
")a \n" +
"join (\n" +
"select biz_id,concat_ws(',',collect_list(src_supplier_id)) src_supplier_id2 from dwiadata.ia_fdw_hr_company_contact_info_relation group by biz_id,ceil(rand()*6)\n" +
")b \n" +
"on a.biz_id =b.biz_id ");
dataset2.explain(true);
System.out.println("dataset2.partition="+dataset.javaRDD().getNumPartitions());
dataset2.write().mode(SaveMode.Overwrite).text("file:///D:\\install\\code\\tencent\\dw_ia_portraitsearch\\output\\hint");
Thread.sleep(Integer.MAX_VALUE);
session.close();
}
看输出的文件
common还是普通的那个5个key
只会形成5个文件 其中part-00000是默认的一个空白文件
group呢?
我们猜想肯定是30个文件+一个00000就是31个文件
结果实际不是只有26+1个文件。
就很奇怪
第一时间想到是不是因为 rand() 随机的原因。一个key希望被分成若6份,但是实际没有
比如key biz_id=1003的数量是61个,最后 只被分成了 1 2 3 4 5 , rand()一直没有随机到6
但是 实际情况不是这样的。 因为就拿1003来说 61个随机1-6 不会恰好都是12345 没有一个6
比如
1有20个
2有10个
3有10个
4有10个
5有10个
6有1个也是有可能的
可能分布不均 但是不可能有一个rand 一个都没有
我们观察
可知确实被分成了6份 确实不均 符合我们的猜想,那么按照group by biz_id ,ceil(rand()*6)
确实将key 分成了30份,那么最终为什么形成的文件少于30个呢?
文件特殊的有两种
其中76号task 属于同样的key分到了一个文件
98号task属于不同的key 分到了一个文件
看spark ui 也符合自己的猜想 都是有12条记录 2个key
这个图也说明了a表和b表都是过滤出了30条记录
然后 5*6*6=180条记录
第一个5是5个key 两个6是hash连接 1001分成6份 join就有6*6条记录。
但是始终没想明白为啥 有的key会聚合到一个文件里。。。。
前面这么多都是为了让自己更好的了解sparksql 你在group by 的时候内部做的一些操作。
比如
这里默认spark.sql.shuffle.partitions=200个task ,网上一般让我们改成executors的2-3倍。
假设我们现在
executor有10台 每个excutor有16线程我们设置core 15
那么 我们的executors=150个 spark.sql.shuffle.partitions设置 300-450好像比较合适。
但是问题来了,按照我们上述实验这样设置有没有问题? 我觉得是有问题的
首先我们明确知道key只有5个 我们采用group by 只分成6份,那么就是预计任务数只有30个
我们还设置200个 不影响计算结果,但是 占用了这么多资源不是浪费么。
有的又说了 在spark-submit的时候 我只设置executor=2 core=2 总共也就只会起4个cores
不管你是200个task 还是36个task 我都是可以的。 但是按照官网的2-3倍好像又有问题。
所以我们提前预知任务的个数,然后采用合适的资源,再根据资源的选择去确定任务个数,这样才是最正确的做法。
此时我们再分析该大佬做的优化。
原始表a大概有10个key 其中每个基本都是10w条数据
原始表b大概有10-20个key,每个key大概在5w左右
其中大概有 7 2 5 这三个key关联到了。
大概就是3*5w*10w条记录也就是 150亿条数据
一般来说。。一条记录按照1kb来说。这里就是
1.5*1000*1000*1000kb=1.5*1000*1000mb=1.5*1000G=1.5T 当然我这个计算是不准确的。。。
但是100G左右是差不多的。
此时
–executor-memory 2g \
–num-executors 50 \
–executor-cores 2 \
那么此时我们就有100个cores了。按道理2-3倍
所以此时task数量差不多是200-300个。
根据key的个数据 3个 我们需要将其打散为100个 所以采用ceil(rand()*100)居然和我一样。。我这里都是自己想的。。。
然后对出现的结果采用repartition(1000)
是因为我们此时 join后的记录大概有150亿,repartition后每个分区就有150w数据方便处理。
话不多说。。直接实战。。。。。
大家随便找个数据多点的表 然后 随便过滤下数据 差不多每个key 1000-5w左右。
本意目的就是 自己和自己关联。 5090*5090 2230*2230 6655*6655
最后差不多是4-5亿多条数据。
最后结果如下 我们来分析下 谁有谁劣。。。
第一种数据直接join 是stage0-1
第二种就是 group by biz_id, ceil(rand()*100) 是stage2-6
首先
ia_fdw_hr_company_contact_info该表在hdfs 上有7个文件
然后spark 读取的时候差不多是按照128M去分片的。所以分成14个片 第14个只有一点点
我们来看stage0 的
可以看到左上角读取了
-
Input Size / Records:
163.9 MB / 21773127 -
Shuffle Write:
454.1 KB / 70
21773127 是表总数据量 这个163.9 MB 好像是hdfs的总大小 应该是1639 MB…..
Shuffle Write 是指读取这个文件后我要输出的内容。
最后只输出了70条 大小是454kb…
70条是怎么来的? spark将hdfs的文件分片为14片。每个片里去读取文件。因为我们读取的时候
是根据key ,collect_list(other_column) 所以每个片只读出了5条数据
1001,[a,b,c,d……]
1002,[a,b,c,d……]
1003,[a,b,c,d……]
1004,[a,b,c,d……]
1005,[a,b,c,d……]
所以总共就是14*5=70条记录。同时stage0显示14个task也就是这14个片
stage=1 显示250个task 是因为我们设置了
sparkConf.set("spark.sql.shuffle.partitions","250");
左上角
-
Output:
650.7 KB / 5 -
Shuffle Read:
908.2 KB / 140
output就是我们最终数据的结果 因为我们直接join 所以只有5个输出 分别是1001 1002 1003 1004 1005
Read 是指这个stage1读取上个stage的write数, 我们刚刚说是70 怎么变成140了? 因为我们是自己和自己关联,所以spark 直接复用 70*2 看大小 908.2kb
刚好是上一阶段 Shuffle Write: 454.1 KB / 70 的两倍
此时注意下面task节目 我按照shuffle read 排序了,只显示了5个task,因为其余task没有数据,上文说过。
这个28其实也是14的2倍。也就是说 是把每个切片的一条数据拿出来了
切片1 1001 ,1002,1003,1004,1005
切片2 1001,1002,1003,1004,1005
.。。。。
这个28条 就是把切片1-14的 1001都拿出来了 并且搞了两份,然后自己和自己关联。
——————————————————————————————————————————
再看看我们采用了group的 join
-
Input Size / Records:
1654.8 MB / 21773127 -
Shuffle Write:
937.2 KB / 6300
input size和之前一样。 就是小数点不一样了。。
shuffle write 937kb 6300条
思考下 读取的时候还是按照14个片分片。但是读取后要group by 也就是5*100=500呀?为啥
这个就是我最开始提出的疑问 有的key是合并了。。。怀疑是本来是500个 然后根据hash分区就分了480个。
例如 本来是
1001-1 1001-2.。。。。1001-100
1002-1 1002-1.。。。。1002-100…. 500个
但是!!1001-1和1002-1的hash指都一样就分到一个分区了。
但实际看其实都不高于500和我们想法一样。
但是注意这个stage2的花费时间很长。。花了1.5min stage0只要了18s
这里可以优化的。下面两个task可以和上面同一时间执行。