spark sql之巧用group by

  • Post author:
  • Post category:其他


为啥想到巧用group by 还是因为优秀的人 想法就是666

看到一篇文章




AI & 大数据 – 专区 – OSCHINA – 中文开源技术交流社区


Artificial Intelligence 人工智能是研究、开发用于模拟、延伸和扩展人的智能的理论、方法、技术及应用系统的一门新的技术科学。大数据(big data),是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。AI 研究通常需要大量数据支撑。



https://www.oschina.net/group/ai-bigdata?circle=big-data



大表与大表关联


hint在我上一篇文章已经分析过了。

这里 大佬用了一个特别巧妙的方法,group by key ,ceil(rand()*100) 乍一看好像就明白了,但要你说好像又说不出个所以然。

瞬间想到distributed by ceil(rand()*10 将任务输出为10个文件

测试代码。


    public static void main(String[] args) throws KuduException, InterruptedException {
        SparkConf sparkConf = new SparkConf().setMaster("yarn").setAppName("ClusterSparkTestGroupBy");
        if (System.getProperty("os.name").toLowerCase().contains("windows")) {
            sparkConf = new SparkConf().setMaster("local[*]").setAppName("dp");
            sparkConf.set("spark.driver.memory", "1g").set("spark.testing.memory", "1073740000");
            kerberos_auth();
        }
        SparkSession session = SparkSession.builder().config(sparkConf)
                .enableHiveSupport().getOrCreate();
        Dataset<Row> dataset = session.sql("select \n" +
                "concat(a.src_supplier_id1,'----',b.src_supplier_id2) \n" +
                "from (\n" +
                "select biz_id,concat_ws(',',collect_list(src_supplier_id)) src_supplier_id1 from dwiadata.ia_fdw_hr_company_contact_info_relation group by biz_id\n" +
                ")a \n" +
                "join (\n" +
                "select biz_id,concat_ws(',',collect_list(src_supplier_id)) src_supplier_id2 from dwiadata.ia_fdw_hr_company_contact_info_relation group by biz_id\n" +
                ")b \n" +
                "on a.biz_id =b.biz_id ");
        System.out.println("dataset.partition="+dataset.javaRDD().getNumPartitions());
        dataset.write().mode(SaveMode.Overwrite).text("file:///D:\\install\\code\\tencent\\dw_ia_portraitsearch\\output\\common");
        Dataset<Row> dataset2 = session.sql("select \n" +
                "concat(a.src_supplier_id1,'----',b.src_supplier_id2)\n" +
                "from (\n" +
                "select biz_id,concat_ws(',',collect_list(src_supplier_id)) src_supplier_id1 from dwiadata.ia_fdw_hr_company_contact_info_relation group by biz_id ,ceil(rand()*6)\n" +
                ")a \n" +
                "join (\n" +
                "select biz_id,concat_ws(',',collect_list(src_supplier_id)) src_supplier_id2 from dwiadata.ia_fdw_hr_company_contact_info_relation group by biz_id,ceil(rand()*6)\n" +
                ")b \n" +
                "on a.biz_id =b.biz_id ");
        dataset2.explain(true);
        System.out.println("dataset2.partition="+dataset.javaRDD().getNumPartitions());
        dataset2.write().mode(SaveMode.Overwrite).text("file:///D:\\install\\code\\tencent\\dw_ia_portraitsearch\\output\\hint");
        Thread.sleep(Integer.MAX_VALUE);
        session.close();
    }

看输出的文件

common还是普通的那个5个key

只会形成5个文件 其中part-00000是默认的一个空白文件

group呢?

我们猜想肯定是30个文件+一个00000就是31个文件

结果实际不是只有26+1个文件。

就很奇怪

第一时间想到是不是因为 rand() 随机的原因。一个key希望被分成若6份,但是实际没有

比如key biz_id=1003的数量是61个,最后 只被分成了 1 2 3 4 5 , rand()一直没有随机到6

但是 实际情况不是这样的。 因为就拿1003来说 61个随机1-6 不会恰好都是12345 没有一个6

比如

1有20个

2有10个

3有10个

4有10个

5有10个

6有1个也是有可能的

可能分布不均 但是不可能有一个rand 一个都没有

我们观察

可知确实被分成了6份 确实不均 符合我们的猜想,那么按照group by biz_id ,ceil(rand()*6)

确实将key 分成了30份,那么最终为什么形成的文件少于30个呢?

文件特殊的有两种

其中76号task 属于同样的key分到了一个文件

98号task属于不同的key 分到了一个文件

看spark ui 也符合自己的猜想 都是有12条记录 2个key


这个图也说明了a表和b表都是过滤出了30条记录

然后 5*6*6=180条记录

第一个5是5个key  两个6是hash连接 1001分成6份 join就有6*6条记录。

但是始终没想明白为啥 有的key会聚合到一个文件里。。。。

前面这么多都是为了让自己更好的了解sparksql 你在group by 的时候内部做的一些操作。

比如

这里默认spark.sql.shuffle.partitions=200个task ,网上一般让我们改成executors的2-3倍。

假设我们现在

executor有10台  每个excutor有16线程我们设置core 15

那么 我们的executors=150个  spark.sql.shuffle.partitions设置 300-450好像比较合适。

但是问题来了,按照我们上述实验这样设置有没有问题? 我觉得是有问题的

首先我们明确知道key只有5个 我们采用group by 只分成6份,那么就是预计任务数只有30个

我们还设置200个 不影响计算结果,但是 占用了这么多资源不是浪费么。

有的又说了 在spark-submit的时候 我只设置executor=2 core=2 总共也就只会起4个cores

不管你是200个task 还是36个task 我都是可以的。 但是按照官网的2-3倍好像又有问题。

所以我们提前预知任务的个数,然后采用合适的资源,再根据资源的选择去确定任务个数,这样才是最正确的做法。

此时我们再分析该大佬做的优化。

原始表a大概有10个key 其中每个基本都是10w条数据

原始表b大概有10-20个key,每个key大概在5w左右

其中大概有 7 2 5 这三个key关联到了。

大概就是3*5w*10w条记录也就是 150亿条数据

一般来说。。一条记录按照1kb来说。这里就是

1.5*1000*1000*1000kb=1.5*1000*1000mb=1.5*1000G=1.5T 当然我这个计算是不准确的。。。

但是100G左右是差不多的。

此时

–executor-memory 2g \

–num-executors 50 \

–executor-cores 2 \

那么此时我们就有100个cores了。按道理2-3倍

所以此时task数量差不多是200-300个。

根据key的个数据 3个 我们需要将其打散为100个 所以采用ceil(rand()*100)居然和我一样。。我这里都是自己想的。。。

然后对出现的结果采用repartition(1000)

是因为我们此时 join后的记录大概有150亿,repartition后每个分区就有150w数据方便处理。

话不多说。。直接实战。。。。。

大家随便找个数据多点的表 然后 随便过滤下数据 差不多每个key 1000-5w左右。

本意目的就是 自己和自己关联。 5090*5090  2230*2230 6655*6655

最后差不多是4-5亿多条数据。

最后结果如下 我们来分析下 谁有谁劣。。。

第一种数据直接join  是stage0-1

第二种就是 group by biz_id, ceil(rand()*100) 是stage2-6

首先

ia_fdw_hr_company_contact_info该表在hdfs 上有7个文件

然后spark 读取的时候差不多是按照128M去分片的。所以分成14个片 第14个只有一点点

我们来看stage0 的

可以看到左上角读取了


  • Input Size / Records:

    163.9 MB / 21773127

  • Shuffle Write:

    454.1 KB / 70

21773127 是表总数据量 这个163.9 MB 好像是hdfs的总大小 应该是1639 MB…..

Shuffle Write 是指读取这个文件后我要输出的内容。

最后只输出了70条 大小是454kb…

70条是怎么来的? spark将hdfs的文件分片为14片。每个片里去读取文件。因为我们读取的时候

是根据key ,collect_list(other_column) 所以每个片只读出了5条数据

1001,[a,b,c,d……]

1002,[a,b,c,d……]

1003,[a,b,c,d……]

1004,[a,b,c,d……]

1005,[a,b,c,d……]

所以总共就是14*5=70条记录。同时stage0显示14个task也就是这14个片

stage=1 显示250个task 是因为我们设置了

sparkConf.set("spark.sql.shuffle.partitions","250");

左上角


  • Output:

    650.7 KB / 5

  • Shuffle Read:

    908.2 KB / 140

output就是我们最终数据的结果 因为我们直接join 所以只有5个输出 分别是1001 1002 1003 1004 1005

Read 是指这个stage1读取上个stage的write数, 我们刚刚说是70 怎么变成140了? 因为我们是自己和自己关联,所以spark 直接复用 70*2 看大小 908.2kb

刚好是上一阶段 Shuffle Write: 454.1 KB / 70 的两倍

此时注意下面task节目 我按照shuffle read 排序了,只显示了5个task,因为其余task没有数据,上文说过。

这个28其实也是14的2倍。也就是说 是把每个切片的一条数据拿出来了

切片1  1001 ,1002,1003,1004,1005

切片2  1001,1002,1003,1004,1005

.。。。。

这个28条 就是把切片1-14的 1001都拿出来了 并且搞了两份,然后自己和自己关联。

——————————————————————————————————————————

再看看我们采用了group的 join


  • Input Size / Records:

    1654.8 MB / 21773127

  • Shuffle Write:

    937.2 KB / 6300

input size和之前一样。 就是小数点不一样了。。

shuffle write 937kb 6300条

思考下 读取的时候还是按照14个片分片。但是读取后要group by 也就是5*100=500呀?为啥

这个就是我最开始提出的疑问 有的key是合并了。。。怀疑是本来是500个 然后根据hash分区就分了480个。

例如 本来是

1001-1 1001-2.。。。。1001-100

1002-1 1002-1.。。。。1002-100…. 500个

但是!!1001-1和1002-1的hash指都一样就分到一个分区了。

但实际看其实都不高于500和我们想法一样。

但是注意这个stage2的花费时间很长。。花了1.5min stage0只要了18s

这里可以优化的。下面两个task可以和上面同一时间执行。



版权声明:本文为cclovezbf原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。