大数据之Spark(四):Spark SQL

  • Post author:
  • Post category:其他


一、SparkSQL的发展


1.1 概述

SparkSQL是Spark⽣态体系中的构建在SparkCore基础之上的⼀个基于SQL的计算模块。 SparkSQL的前身不叫SparkSQL,⽽叫Shark,最开始的时候底层代码优化,sql的解析、执⾏引擎等等完全基于 Hive,总是Shark的执⾏速度要⽐hive⾼出⼀个数量级,但是hive的发展制约了Shark,所以在15年中旬的时候, shark负责⼈,将shark项⽬结束掉,重新独⽴出来的⼀个项⽬,就是sparksql,不再依赖hive,做了独⽴的发展, 逐渐的形成两条互相独⽴的业务:SparkSQL和Hive-On-Spark。在SparkSQL发展过程中,同时也吸收了Shark的一些特点:基于内存的列存储,动态字节码优化技术。


1.2 特点

官网的原话:**Spark SQL is Apache Spark’s module for working with structured data. **

即Spark SQL是Apache Spark处理结构化数据的模块。

  • 集成

无缝地将SQL查询与Spark程序混合。

Spark SQL允许您使用SQL或熟悉的DataFrame API在Spark程序中查询结构化数据。适用于Java、Scala、Python和R语言。

  • 统一的数据访问

以相同的方式连接到任何数据源。

DataFrames和SQL提供了一种访问各种数据源的通用方法,包括Hive、Avro、Parquet、ORC、JSON和JDBC。您甚至可以通过这些源连接数据。

  • 蜂巢集成

在现有仓库上运行SQL或HiveQL查询。

Spark SQL支持HiveQL语法以及Hive SerDes和udf,允许您访问现有的Hive仓库。

  • 标准的连接

通过JDBC或ODBC连接。

服务器模式为业务智能工具提供了行业标准JDBC和ODBC连接。


1.3 总结

SparkSQL就是Spark生态体系中用于处理结构化数据的⼀个模块。结构化数据是什么?存储在关系型数据库中的数据,就是结构化数据;半结构化数据是什么?类似xml、json等的格式的数据被称之为半结构化数据;非结构化数据是什么?音频、视频、图片等为非结构化数据。 换句话说,SparkSQL处理的就是⼆维表数据。

二、SparkSQL的编程入口和模型

2.1 SparkSQL编程入口

SparkSQL中的编程模型,不再是SparkContext,但是创建需要依赖SparkContext。SparkSQL中的编程模型,在spark2.0以前的版本为SQLContext和HiveContext,HiveContext是SQLContext的一个子类,提供Hive中特有的一些功能,比如row_number开窗函数等等,这是SQLContext所不具备的,Spark2.0之后将这两个模型进行了合并——SparkSession。SparkSession的构建需要依赖SparkConf或者SparkContext。使用工厂构建器 (Builder方式)模式创建SparkSession。

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();

2.2 编程模型简介

主要通过两种方式操作SparkSQL,一种就是SQL,另一种为DataFrame和Dataset。


  • SQL

SQL不用多说,就和Hive操作⼀样,但需要清楚一点,SQL操作的是表,所以如果用SQL进行操作,就需要将SparkSQL对应的编程模型转化成为一张表。


  • DataFrame和Dataset

DataFrame和Dataset是SparkSQL中的编程模型。DataFrame和Dataset我们都可以理解为是一张mysql中的二维表,表有什么?表头,表名,字段,字段类型。RDD其实说⽩了也是一张二维表,但是这张二维表相比较于DataFrame和Dataset却少了很多东⻄,⽐如表头,表名,字段,字段类型,只有数据。 Dataset是在spark1.6.2开始出现出现的api,DataFrame是1.3的时候出现的,早期的时候DataFrame叫 SchemaRDD,SchemaRDD和SparkCore中的RDD相比较,就多了Schema,所谓约束信息,元数据信息。 ⼀般的,将RDD称之为Spark体系中的第⼀代编程模型;DataFrame⽐RDD多了⼀个Schema元数据信息, 被称之为Spark体系中的第二代编程模型;Dataset吸收了RDD的优点(强类型推断和强⼤的函数式编程)和 DataFrame中的优化(SQL优化引擎,内存列存储),成为Spark的最新⼀代的编程模型。

2.3 RDD V.S. DataFrame V.S. Dataset

创建DataFrame

1.spark.read.format(“xxx”)

# 这种方式对应与文件/外部输入 例如csv对应了CSVFileFormat

2.spark.createDataFrame(RDD[A])

[A<:Product] : 元组和样例类都是Product子类型

3.import spark.implicits._    // 导入sparkSession中的隐式转换操作

Seq((“a”,1),(“b”,2),(“c”,3)).toDF(“k”,”v”).show()


Dataset的创建注意事项:

# 不建议使用普通类
class User(val id:Int,val name:String)

val list3 = List(new User(100,"xx"))

spark.createDataset(list3).show  // list3中的类型应该存在Encoder

# 对于普通类型,要么转成样例类,否则需要手动提供Encoder隐式值

implicit val e:Encoder[User] = Encoders.javaSerialization(classOf[User])

class User(val id:Int,val name:String) extends Serializable

2.4 RDD、DataFrame、Dataset之间的转换

  • RDD=>DataFrame             rdd.toDF
  • RDD=>Dataset                   rdd.toDS
  • DataFrame=>RDD             df.rdd
  • DataFrame=>Dataset        df.as[User]
  • Dataset=>RDD                  ds.rdd
  • Dataset=>DataFrame        ds.toDF


2.5 DataFrame & Dataset API


网址:

http


://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset


分类:

  • Action算子:collect show
  • Typed transformations 强类型转换 返回Dataset[U]
  • Untyped transformations  返回DataFrame \Column
  • 其它

三、SparkSQL的数据加载和落地

3.1 数据的加载

SparkSQL中加载外部的数据,使用统一的API入口

spark.read.format(数据⽂件格式).load(path)

这个方式有更加清晰的简写方式,比如要加载json格式的文件

spark.read.json(path)


注:默认加载的文件格式为parquet

def main(args: Array[String]): Unit = {
 val spark = SparkSession.builder()
 .master("local[*]")
 .appName("SparkSQLLoadAndSave")
 .getOrCreate()

 //加载数据 默认的加载⽂件的格式为parquet
 var pdf = spark.read.format("json").load("file:///E:/data/spark/sql/people.json")
 //简写⽅式
 pdf = spark.read.json("file:///E:/data/spark/sql/people.json")

 //parquet
 pdf = spark.read.parquet("file:///E:/data/spark/sql/users.parquet")

 //text 加载普通的⽂本⽂件,只能解析成⼀列
 pdf = spark.read.text("file:///E:/data/spark/sql/dailykey.txt")

 //csv 普通的⽂本⽂件,列之间以","作为分隔符
 pdf = spark.read.csv("file:///E:/data/spark/sql/province.csv")
 .toDF("pid", "province", "code", "cid")// 根据需要重新命名列名 数据类型均为字符串

 //orc 是rc⽂件格式的升级版本
 pdf = spark.read.orc("file:///E:/data/spark/sql/student.orc")

 //jdbc
 val url = "jdbc:mysql://localhost:3306/test"
 val table = "wordcount"
 val properties = new Properties()
 properties.put("user", "bigdata")
 properties.put("password", "sorry")
 pdf = spark.read.jdbc(url, table, properties)
 pdf.printSchema()
 pdf.show()

spark.stop()
}

3.2 数据的落地

SparkSQL对数据的落地保存使用的api为:spark.write.save(),需要指定数据的落地格式。

和read的默认格式⼀样,save的默认格式也是parquet,需要在write和save之间指定具体的格式

format(format) ,同样也有简写方式:spark.write.json/parquet等

def main(args: Array[String]): Unit = {
 val spark = SparkSession.builder()
 .master("local[*]")
 .appName("SparkSQLLoadAndSave")
 .getOrCreate()
 val df = spark.read.orc("file:///E:/data/spark/sql/student.orc")
 /*
 数据的落地
 默认的存储格式为parquet,同时基于snappy压缩⽅式存储
 # 落地的保存⽅式SaveMode
 ErrorIfExists:⽬录存在报错,默认的格式
 Append:在原有的基础之上追加
 Ignore:忽略,如果⽬录存在则忽略,不存在则创建
 Overwrite:覆盖(删除并重建)
 */
 
df.write.format("json").mode(SaveMode.Overwrite).save("file:///E:/data/spark/sql/stu")
 val url = "jdbc:mysql://localhost:3306/test"
 val table = "student"
 val properties = new Properties()
 properties.put("user", "bigdata")
 properties.put("password", "sorry")
 df.write.mode(SaveMode.Append).jdbc(url, table, properties)
 spark.stop()
 }

四、SparkSQL与Hive的整合

SparkSQL和Hive的整合,是⼀种比较常见的关联处理方式,SparkSQL加载Hive中的数据进行业务处理,同时将计 算结果落地回Hive中。 ⾸先将服务器中Hadoop安装路径下的hdfs-site.xml、core-site.xml以及hive中hive-site.xml三个⽂件拿出来,放到⼯程中的resource⽂件夹下。


重点:注意修改每个文件中的路径,一定要改成虚拟机的服务ip地址或者映射名称

import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 使⽤SparkSQL来操作Hive
*/
object _04SparkSQLOnHiveOps {
 def main(args: Array[String]): Unit = {
 System.setProperty("hadoop.home.dir","E:\\hadoop-common-2.2.0-bin-master")
 val spark = SparkSession.builder()
 .appName("SparkSQLOnHive")
 .master("local")
 .enableHiveSupport() // 开启hive机制
 .getOrCreate()
 /**
     * 
     如果我们开起了Hive的机制后,并且导⼊了相关的xml⽂件,可能会有⼀个⼩⼩的问题
     之前读取的本地磁盘⽂件,就突然找不到了,为什么?
     因为我们使⽤了hive,那么它默认会搜索Hdfs上的数据⽂件,所以本地磁盘会找不到⽂件,这个问题很 
     好解决,在路径前⾯加上file:///D:\BaiduNetdiskDownload\sql-data
     如果无效,那么请把连接Hive的配置⽂件删掉,因为现在不需要⽤它
     */

 val df = spark.read.text("E:\\test.txt")
 df.createTempView("stu")
 spark.sql(s"""
 |select
 | tmp.word,
 | count(tmp.word) as counts
 | from(
 | select
 | explode(split(value," ")) word
 | from stu
 | ) tmp
 | group by tmp.word
 | order by counts desc
 |""".stripMargin).show()

// 获取hive中的表或者数据
// spark.sql("select * from test.fact_access_log")
// val url = "jdbc:mysql://localhost:3306/spark"
// val tab = "stu"
// val prop = new Properties()
// prop.setProperty("user","root")
// prop.setProperty("password","123456")

// 读取MySQL数据
// val df: DataFrame = spark.read.jdbc(url, tab, prop)

// 接下来将数据存⼊hive
/**
* hive建表语句
* CREATE TABLE `stu` (
* `id` bigint,
* `name` string,
* `age` bigint
* )
*/
// df.write.insertInto("test.stu")
 }
}


注:执行如上代码时,首先要在Hive中先建表

五、SparkSQL中的函数操作

5.1 函数的定义

SQL中函数,其实就是各大编程语言中的函数,或者方法,就是对某⼀特定功能的封装,通过它可以完成较为复杂的统计。这里函数的学习,就基于Hive中的函数来学习。

5.2 函数的分类


1)功能上划分


数值

  • round(x,[d]):对x保留d位小数,同时会对结果四舍五入
  • floor(x):获取不大于x的最大整数
  • ceil(x):获取不小于x的最小整数
  • rand():
  • 获取0到1之间的随机数   /  获取表中随机的两条记录
hive> select * , rand() rand from teacher order by rand limit 2;
or
hive> select * from teacher order by rand() limit 2;


数学

  • abs(x):取绝对值


条件

  1. if(expr1, expr2, expr3):如果expr1为true,返回expr2,反之返回expr3
  2. case when 多条件表达式


日期

  1. current_date(),获取当前的日期,日期格式为标准格式:yyyy-MM-dd
  2. current_timestamp():获取当前日期的时间戳,格式:yyyy-MM-dd HH:mm:ss.SSS
  3. add_months(start_date, num_months):返回start_date之后num_months月的日期
  4. date_add(start_date, num_days):返回start_date之后num_days天的日期
  5. date_sub(start_date, num_days):返回start_date之前num_days天的日期
  6. next_day(start_date, day_of_week),返回start_date之后最接近的day_of_week对应的日期
  7. dayofmonth(date) 返回date对应月份中的第几天
  8. weekofyear(date) 返回date对应年份中的第几周
  9. minute hour day month year 获取日期中对应的年月日时分
  10. date_format(date,format),返回指定格式化时间
  11. datediff(date1, date2),返回date1和date2之间的差值(单位是天),换句话说就是date1-date2
  12. from_unixtime(unix_time, format)将unix_time转化为格式化时间
  13. to_date(datetime)返回datetime中的日期部分


字符串



注意:数据库中的字符串索引从1开始,而不是0

  1. length(str) 返回字符串str的长度
  2. instr(str, substr),作⽤等同于str.indexOf(substr)
  3. substr substring(str, pos[, len]):从str的pos位置开始,截取子字符串,截取len的长度,如果不传len,截取余下所有
  4. substring_index(str, delim, count):将字符串str使⽤delim进⾏分割,返回强count个使⽤delim拼接的子字符串
  5. concat(str1, str2)拼接字串
  6. concat_ws(separator, str1, str2):使⽤指定分隔符来拼接字符串


统计函数

  1. index(arr, n),就是arr(n)获取索弓|n对应的元素
  2. sum、count、max、avg、 min等


特殊


  • array:返回数组

  • collect_set:返回⼀个元素不重复的set集合

  • collect_list:返回⼀个元素可重复的list集合

  • split(str, regex):使⽤regex分隔符将str进⾏切割,返回⼀个字符串数组

  • explode(array):将⼀个数组,转化为多⾏

  • cast(type1 as type2):将数据类型type1的数据转化为数据类型type2


Demo:使用SQL方式统计WordCount

select
 tmp.word,
 count(1) counts
from (
 select
 explode(split(line, "\\s+")) word
 from test_wc
) tmp
group by tmp.word
order by counts desc, tmp.word;


2)实现方式上划分

  • UDF(User Defined function)用户自定义函数

一路输入,一路输出,比如year,date_add, instr

  • UDAF(User Defined aggregation function)⽤户⾃定义聚合函数

多路输入,⼀路输出,常见的聚合函数,count、sum、collect_list

  • UDTF(User Defined table function)⽤户⾃定义表函数

一路输入,多路输出,explode

  • 开窗函数

row_number() ——>分组topN的求解

select
 tmp.*
from (
 select
 name,
 age,
 married,
 height,
 row_number() over(partition by married order by height) rank
 from teacher
) tmp
where tmp.rank < 3;

5.3 自定义函数


5.3.1 概述

当系统提供的这些函数,满足不了实际需要时,就需要进行自定义相关的函数,一般自定义的函数分为两种, UDF和UDAF。


5.3.2 UDF

一路输⼊,一路输出,完成就是基于scala函数。

通过模拟获取字符串长度的udf来学习自定义udf操作。

object _01SparkSQLUDFOps {
 def main(args: Array[String]): Unit = {
 val spark = SparkSession.builder()
 .master("local[*]")
 .appName("SparkSQLUDF")
 .getOrCreate()

 import spark.implicits._
 val rdd = spark.sparkContext.parallelize(List(
 "songhaining",
 "yukailu",
 "liuxiangyuan",
 "maningna"
 ))

//使⽤sparksession进⾏udf和udaf的注册
// spark.udf.register[Int, String]("myLen", (str:String) => myStrLength(str))
// spark.udf.register[Int, String]("myLen", str => myStrLength(str))
 spark.udf.register[Int, String]("myLen", myStrLength)
 
 /* private val myLen: UserDefinedFunction = udf((s: String) => {
 s.length
})
 df.select(myLen($"name")).show()s
 */
 val df = rdd.toDF("name")
 df.createOrReplaceTempView("test")

 //求取每个字符串的⻓度
 val sql =
 """
 |select
 | name,
 | length(name) nameLen,
 | myLen(name) myNameLen
 |from test
 """.stripMargin
 spark.sql(sql).show()
 spark.stop()
 }

 //⾃定义udf
 def myStrLength(str:String):Int = str.length
}


5.3.3 UDAF

多路输入,一路输出,类似combineByKey

通过模拟avg函数,来学习如何自定义UDAF操作。

object _02SparkSQLUDAFOps {
 def main(args: Array[String]): Unit = {
 val spark = SparkSession.builder()
 .master("local[*]")
 .appName("SparkSQLUDAF")
 .getOrCreate()

 import spark.implicits._
 val rdd = spark.sparkContext.parallelize(List(
 Student(1, "宋海宁", 168.5, 105.5, 18),
 Student(2, "麻宁娜", 165.0, 101.0, 19),
 Student(3, "刘⾹媛", 170.5, 108.5, 17),
 Student(4, "蔚凯璐", 172.5, 115, 16)
 ))
 spark.udf.register("myAvg", new MyUDAFAVG)
 val df = rdd.toDS()
 df.createOrReplaceTempView("student")
val sql =
 """
 |select
 | round(avg(height), 1) avg_height,
 | avg(weight) avg_weight,
 | avg(age) avg_age,
 | myAvg(weight) myAvg_wight
 |from student
 """.stripMargin
 spark.sql(sql).show()
 spark.stop()
 }
}
case class Student(id:Int, name:String, height:Double, weight:Double, age:Int)


自定义UDAF

class MyUDAFAVG extends UserDefinedAggregateFunction {
 /*
 指定⽤户⾃定义udaf输⼊参数的元数据
 datediff(date1, date2)
 */
 override def inputSchema: StructType = {
 StructType(List(
 StructField("weight", DataTypes.DoubleType, false)
 ))
 }
 //udaf返回值的数据类型
 override def dataType: DataType = DataTypes.DoubleType
 //udaf⾃定义函数求解过程中的临时变量的数据类型
 override def bufferSchema: StructType = {
 StructType(List(
 StructField("sum", DataTypes.DoubleType, false),
 StructField("count", DataTypes.IntegerType, false)
 ))
 }
 //聚合函数是否幂等,相同输⼊是否总是得到相同输出
 override def deterministic: Boolean = true
 /*
 分区内的初始化操作
 其实就是给sum和count赋初始值
 */
override def initialize(buffer: MutableAggregationBuffer): Unit = {
 buffer.update(0, 0.0)
 buffer.update(1, 0)
 }
 /**
 * 分区内的更新操作
 * @param buffer 临时变量
 * @param input ⾃定义函数调⽤时传⼊的值
 */
 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
 buffer.update(0, buffer.getDouble(0) + input.getDouble(0))
 buffer.update(1, buffer.getInt(1) + 1)
 }
 //分区间的合并操作
 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
 buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0))
 buffer1.update(1, buffer1.getInt(1) + buffer2.getInt(1))
 }
 //udaf聚合结果的返回值
 override def evaluate(buffer: Row): Double = {
 buffer.getDouble(0) / buffer.getInt(1)
 }
}

5.4 多维立方体分析函数


grouping sets 、rollup 、cube 是用来处理多维分析的函数:


  • grouping sets

    :对分组集中指定的组表达式的每个⼦集执⾏group by, group by A,B grouping sets(A,B)就等价于 group by A union group by B,其中A和B也可以是⼀个集合,比如group by A,B,C grouping sets((A,B),(A,C))。

  • rollup

    :在指定表达式的每个层次级别创建分组集。group by A,B,C with rollup ⾸先会对(A、B、C)进行group by,然后对(A、B)进⾏group by,然后是(A)进⾏group by,最后对全表进行group by操作。

  • cube

    : 为指定表达式集的每个可能组合创建分组集。group by A,B,C with cube 首先会对(A、B、C)进行group by,然后依次是(A、B),(A、C),(A),(B、C),(B),(C),最后对全表进行group by操作。

六、SparkSQL之SQL调优

6.1 缓存数据至内存

Spark SQL可以通过调用sqlContext.cacheTable(“tableName”) 或者dataFrame.cache(),将表⽤⼀种柱状格式(an inmemory columnar format)缓存至内存中。然后Spark SQL在执行查询任务时,只需扫描必需的列,从而以减少扫描数据量、提高性能。通过缓存数据,Spark SQL还可以自动调节压缩,从而达到最小化内存使用率和降低GC压力的目的。调用sqlContext.uncacheTable(“tableName”)可将缓存的数据移出内存。

可通过两种方式配置缓存数据功能:

  • 使用SQLContext的setConf方法
  • 执行SQL命令SET key=value

Property Name

Default

Meaning
spark.sql.inMemoryColumnarStorage.compressed true 如果假如设置为true,SparkSql会根据统计信息⾃动的为每个列 选择压缩⽅式进⾏压缩。
spark.sql.inMemoryColumnarStorage.compressed 10000 控制列缓存的批量大小。批次⼤有助于改善内存使⽤和压缩,但 是缓存数据会有OOM的⻛险。

6.2 参数调优


可以通过配置下表中的参数调节Spark SQL的性能。

Property Name Default Meaning
spark.sql.files.maxPartitionBytes 134217728 获取数据到分区中的最大字节数。
spark.sql.files.openCostlnBytes 4194304 (4MB) 该参数默认4M,表示小于4M的小文件会合并到一个分区中,
spark.sql.broadcastTimeout 300 广播等待超时时间,单位秒。
spark.sql.autoBroadcastJoinThreshold 10485760(10 MB) 最大广播表的大小。设置为-1可以禁止该功能。当前统计信息仅支持Hive Metastore表。
spark.sql.shuffle.partitions 200 设置huffle分区数,默认200。

6.3 SQL数据倾斜优化

以group By出现数据倾斜为例进行解决。采用的案例就是wordcount。两阶段聚合进行解决:局部聚合+全局聚合。

object _03SparkSQLDataskewOps {
 def main(args: Array[String]): Unit = {
 Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
 Logger.getLogger("org.spark_project").setLevel(Level.WARN)
 val spark = SparkSession.builder()
 .master("local[*]")
 .appName("SparkSQLDataskew")
 .getOrCreate()
 val list = List(
 "zhang zhang wen wen wen wen yue yue",
 "gong yi can can can can can can can can can can",
 "chang bao peng can can can can can can"
 )
 import spark.implicits._
 val rdd = spark.sparkContext.parallelize(list)
 val df = rdd.toDF("line")
 df.createOrReplaceTempView("test")
 println("原始表中的数据---------------------")
 df.show()
 println("step 1-----------进⾏数据拆分-------------")
 var sql =
 """
 |select
 | split(line, '\\s+')
 |from test
 """.stripMargin
 spark.sql(sql).show()
 println("step 2-----------进⾏列转化为多⾏数据-------------")
 sql =
 """
 |select
 | explode(split(line, '\\s+')) word
 |from test
 """.stripMargin
 spark.sql(sql).show()
 println("step 3-----------进⾏添加前缀打散数据-------------")
 sql =
 """
 |select
 | concat_ws("_", cast(floor(rand() * 2) as string), t1.word) prefix_word
 |from (
 | select
 | explode(split(line, '\\s+')) word
| from test
 |) t1
 """.stripMargin
 spark.sql(sql).show()
 println("step 4-----------进⾏有前缀的局部聚合-------------")
 sql =
 """
 |select
 | concat_ws("_", cast(floor(rand() * 2) as string), t1.word)
prefix_word,
 | count(1) countz
 |from (
 | select
 | explode(split(line, '\\s+')) word
 | from test
 |) t1
 |group by prefix_word
 """.stripMargin
 spark.sql(sql).show()
 println("step 5-----------进⾏去前缀操作-------------")
 sql =
 """
 |select
 | t2.prefix_word,
 | substr(t2.prefix_word, instr(t2.prefix_word, "_") + 1) up_word,
 | t2.countz
 |from (
 | select
 | concat_ws("_", cast(floor(rand() * 2) as string), t1.word)
prefix_word,
 | count(1) countz
 | from (
 | select
 | explode(split(line, '\\s+')) word
 | from test
 | ) t1
 | group by prefix_word
 |) t2
 """.stripMargin
 spark.sql(sql).show()
 println("step 6-----------进⾏全局聚合-------------")
 sql =
 """
 |select
 | substr(t2.prefix_word, instr(t2.prefix_word, "_") + 1) up_word,
 | sum(t2.countz) counts
 |from (
 | select
| concat_ws("_", cast(floor(rand() * 2) as string), t1.word)
prefix_word,
 | count(1) countz
 | from (
 | select
 | explode(split(line, '\\s+')) word
 | from test
 | ) t1
 | group by prefix_word
 |) t2
 |group by up_word
 """.stripMargin
 spark.sql(sql).show()
 spark.stop()
 }
 //⾃定义添加前缀的函数
/* def addPrefix(str:String):String = {
 val random = new Random()
 random.nextInt(2) + "_" + str
 }*/
}

由于处理过程中,使用了两层group By,所以经常将使用sql的处理称之为双重group by。

参考资料:https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

七、SparkSQL运行架构及原理

Spark SQL对SQL语句的处理与关系型数据库类似,即词法/语法解析、绑定、优化、执行。Spark SQL会先将SQL语句解析成⼀棵树,然后使用规则(Rule)对Tree进⾏绑定、优化等处理过程。Spark SQL由Core、Catalyst、 Hive、Hive-ThriftServer四部分构成:

  • Core: 负责处理数据的输⼊和输出,如获取数据,查询结果输出成DataFrame等
  • Catalyst: 负责处理整个查询过程,包括解析、绑定、优化等
  • Hive: 负责对Hive数据进⾏处理
  • Hive-ThriftServer: 主要⽤于对hive的访问

7.1 SparkSQL运行架构

7.2 SparkSQL运行原理


1.使用SessionCatalog保存元数据

  • 在解析SQL语句之前,会创建SparkSession,或者如果是2.0之前的版本初始化SQLContext,SparkSession只是封 装了SparkContext和SQLContext的创建⽽已。会把元数据保存在SessionCatalog中,涉及到表名,字段名称和字 段类型。创建临时表或者视图,其实就会往SessionCatalog注册


2.解析SQL,使用ANTLR生成未绑定的逻辑计划

  • 当调用SparkSession的sql或者SQLContext的sql方法,我们以2.0为准,就会使用SparkSqlParser进⾏解析SQL. 使 ⽤的ANTLR进⾏词法解析和语法解析。它分为2个步骤来生成Unresolved LogicalPlan:
  1. 词法分析:Lexical Analysis,负责将token分组成符号类
  2. 构建⼀个分析树或者语法树AST


3 使用分析器Analyzer绑定逻辑计划

  • 在该阶段,Analyzer会使用Analyzer Rules,并结合SessionCatalog,对未绑定的逻辑计划进行解析,生成已绑定的逻辑计划。


4.使⽤优化器Optimizer优化逻辑计划

  • 优化器也是会定义一套Rules,利用这些Rule对逻辑计划和Exepression进行迭代处理,从而使得树的节点进行和并和优化


5.使用SparkPlanner生成物理计划

  • SparkSpanner使用Planning Strategies,对优化后的逻辑计划进行转换,生成可以执行的物理计划SparkPlan.


6.使用QueryExecution执行物理计划

  • 此时调⽤SparkPlan的execute⽅法,底层其实已经再触发JOB了,然后返回RDD



版权声明:本文为weixin_44291548原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。