SPARK全解析
标签(空格分隔): Spark
Spark是什么?
Spark是Apache的一个顶级项目,是一个快速、通用的大规模数据处理引擎。以下是它的几个特点 :
-
Speed
存储在内存中的数据,Spark比Hadoop的MapReduce快100多倍,存储在磁盘中的数据要快10多倍。 -
Easy of Use
开发Spark应用程序可以使用Java、Scala、Python、R等编程语言 -
Generality
Spark提供了SparkSQL、Streaming、MLlib、GraphX,功能强大。一站式解决需求。 -
Runs Everywhere
Spark可以运行在Hadoop的Yarn上、Mesos上、以及它自身的standalone上,处理的文件系统包括HDFS、Cassandra、HBase、S3.
以上部分摘自官网:
http://spark.apache.org/
Spark 源码编译
本文以 spark1.6.1版本为例
-
(1)下载源码包
-
(2)准备环境
Spark1.6.1版本编译需要Maven 3.3.3 or newer and Java 7+ 环境
-
(3)编译
–1 解压spark源码
–2 在执行编译前修改$SPARK_HOME下的make-distribution.sh文件如下
–3 编译apache hadoop,需要配置镜像文件
路径
:
/opt/modules/apache-maven-3.3.3/conf/settings.xml
配置内容
:
如果是cdh版本hadoop,则必须去掉该镜像
–配置域名解析服务器
# vi /etc/resolv.conf
内容:
nameserver 8.8.8.8
nameserver 8.8.4.4
–4 执行编译(根据所使用的Hadoop版本进行编译)
-
(2)准备环境
—-针对APACH HADOOP
./make-distribution.sh --tgz -Phadoop-2.4 -Dhadoop.version=2.5.0 -Phive -Phive-thriftserver -Pyarn
—- 针对CDH HADOOP
./make-distribution.sh --tgz -Phadoop-2.4 -Dhadoop.version=2.5.0-cdh5.3.6 -Phive -Phive-thriftserver -Pyarn
Spark本地模式安装配置及Spark Shell基本使用
1、Spark安装环境准备:
- JAVA
-
HDFS(
HDFS是否脱离了安全模式
) -
SCALA
2、Spark安装
-
将编译好的cdh版本的spark赋予执行权限,解压至指定目录
-
通过notepad++配置$SPARK_HOME目录下conf下的配置文件
①日志配置
更改log4j.properties.template文件名为log4j.properties
②配置spark-env.sh
-
配置完成
3、测试Spark Shell命令行
使用Spark RDD进行简单测试:
-
启动spark交互式命令行:bin/spark-shell 并编程测试wordcount
- HDFS上的数据源
- 定义rdd读取数据源
- 使用rdd.map(line => line.split(“ ”)) 可以将文件按空格进行分割,分割之后会变成数组
- 再在其后面加上.collect之后查看输出
-
这里需使用flatMap代替map对该数组进行一个压平的操作,即: rdd.flatMap(line => line.split(” “)).collect,输出的为压平后的一个个单词
-
再使用map操作将其变为元组对,即:rdd.flatMap(line => line.split(” “)).map(word => (word,1)).collect
输出结果:
-
进行到这一步,再使用reduceByKey()就可完成wordcount了,reduceByKey中的ByKey使数组中的元组对按key进行排序,reduce进行相加。
即:
rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a,b) => (a + b))
-
上一步即完成了对数据的处理,再对其赋值之后保存,即可完成wordcount
赋值:
val wordcountRDD = rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a,b) => (a + b))
-
保存:
wordcountRDD.saveAsTextFile("/user/vin/wordcount/output")
Spark集群
Spark Cluster 可以运行在Yarn上,由Yarn进行资源管理和任务调度,还可以运行在其自带的资源管理调度框架Standalone上,分为主节点Master(类似于yarn的resourcemanager)和从节点Work(类似于yarn的nodemanager),不同的是一台机器上可以运行多个Work。
Spark架构原理图:
说明:Job:包含了由Spark Action催生的多个Tasks,是一个并行计算
Stage:一个Job分为了多个彼此依赖的Stage,每个Stage又包含了一系列的Task,类似于MapReduce的Map阶段和reduce阶段。
– Spark集群安装部署
– 配置$SPARK_HOME目录下conf下的配置文件
1 配置spark-env.sh
参考官网:
http://spark.apache.org/docs/1.6.1/spark-standalone.html#installing-spark-standalone-to-a-cluster
2 配置slaves
配置运行Work的主机名
3 启动
在sbin目录里使用
start-master.sh
start-slaves.sh
//启动所有的从节点,使用此命令时,运行此命令的机器,必须要配置与其他机器的SSH无密钥登录,否则启动的时候会出现一些问题
4 Spark的的web监控端口为8080,URL为7070,Job监控4040,都是自动增长
5 测试Spark集群
spark-shell是spark的一个application,将其运行在spark standalone上,通过输入: bin/spark-shell –help 查看其运行方法
启动: bin/spark-shell –master spark://vin01:7077
Spark Application开发、运行及监控(IDEA)
-
在IDEA中创建scala Project,并添加spark依赖包
步骤:File -> Project Structure -> Libraries -> +号 -> java -> 选择编译好的spark目录下的lib依赖包
- 导入依赖包之后即可进行程序开发,新建包、在包中创建Scala class之SparkApp
- 编程
-
配置resurces(在新建scala Project时,创建resources)
由于程序中需要读hdfs上的数据文件,所以需要将hadoop的配置文件hdfs-site.xml 与core-site.xml 文件拷贝到scala project的resources中
-
本地运行
使用Idea工具可以直接运行在本地模式,无需插件
运行查看输出:
-
打包在Spark shell上提交运行(bin/spark-submit …)
步骤:1 打包
File -> project structure -> Artifacts -> + -> jar
2 选择类
3 去除依赖包(因为集群上本身有)
4 上述步骤设置了打哪个包,还需要build进行打包
5 将jar包上传至linux下并赋予执行权限:此处为方便上传到Spark主目录下
6 提交任务
7 先测试本地
8 测试集群:此时需要将代码中的master注释掉再重新打包,重新打包直接用rebuild即可
9 启动spark Standalone
查看8080端口是否有资源
提交任务
bin/spark-sumit --master spark://vin01:7077 scalaProject.jar
10 监控
Spark 日志监控(HistoryServer)配置
Spark HistoryServer配置分为两个部分:
第一、设置SparkApplicaiton在运行时,需要记录日志信息
配置:配置$SPARK_HOME目录下conf下spark-defaults.conf文件
第二、启动HistoryServer,通过界面查看
配置Spark主目录下conf下spark-env.sh文件
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://vin01/user/vin/sparkEventLogs"
配置完成启动服务
它端口号是18080
![]()
Spark RDD
-
RDD是什么
官方解释:
RDD是Spark的基本抽象,是一个弹性分布式数据集,代表着不可变的,分区(partition)的集合,能够进行并行计算。也即是说:- 它是一系列的分片、比如说128M一片,类似于Hadoop的split;
- 在每个分片上都有一个函数去执行/迭代/计算它
- 它也是一系列的依赖,比如RDD1转换为RDD2,RDD2转换为RDD3,那么RDD2依赖于RDD1,RDD3依赖于RDD2。
- 对于一个Key-Value形式的RDD,可以指定一个partitioner,告诉它如何分片,常用的有hash、range
- 可选择指定分区最佳计算位置
-
创建RDD的两种方式
-
方式一:
将集合进行并行化操作
List\Seq\Array
演示:
-
方式二:
外部存储系统
HDFS, HBase, or any data source offering a Hadoop InputFormat.
-
方式一:
-
RDD的三大Operations
-
Transformation
从原有的一个RDD进行操作创建一个新的RDD,通常是一个lazy过程,例如map(func) 、filter(func),直到有Action算子执行的时候 -
Action
返回给驱动program一个值,或者将计算出来的结果集导出到存储系统中,例如count() reduce(func) -
Persist
将数据存储在内存中,或者存储在硬盘中
例如: cache() persist() unpersist()
合理使用persist()和cache()持久化操作能大大提高spark性能,但是其调用是有原则的,必须在transformation或者textFile后面直接调用persist()或cache(),如果先创建的RDD,然后再起一行调用这两个方法,则会报错
-
Transformation
-
RDD的常用Transformation
– map(func) :返回一个新的分布式数据集,由每个原元素经过func函数转换后组成
spark shell本地测试:
val numbers = Array(1, 2, 3, 4, 5)
val numberRDD = sc.parallelize(numbers, 1)
val multipleNumberRDD = numberRDD.map ( num => num * 2 )
multipleNumberRDD.foreach ( num => println(num) )
– filter(func) : 返回一个新的数据集,由经过func函数后返回值为true的原元素组成
val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val numberRDD = sc.parallelize(numbers, 1)
val evenNumberRDD = numberRDD.filter { num => num % 2 == 0 }
evenNumberRDD.foreach { num => println(num) }
– flatMap(func) : 类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)
val lineArray = Array("hello you", "hello me", "hello world")
val lines = sc.parallelize(lineArray, 1)
val words = lines.flatMap { line => line.split(" ") }
words.foreach { word => println(word) }
– union(otherDataset) : 返回一个新的数据集,由原数据集和参数联合而成
– groupByKey([numTasks]) :在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task
val scoreList = Array(Tuple2("class1", 80), Tuple2("class2", 75),Tuple2("class1", 90), Tuple2("class2", 60))
val scores = sc.parallelize(scoreList, 1)
val groupedScores = scores.groupByKey()
groupedScores.foreach(score => {
println(score._1);
score._2.foreach { singleScore => println(singleScore) };
println("=============================") })
– reduceByKey(func, [numTasks]) : 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。在实际开发中,能使reduceByKey实现的就不用groupByKey
val scoreList = Array(Tuple2("class1", 80), Tuple2("class2", 75),Tuple2("class1", 90), Tuple2("class2", 60))
val scores = sc.parallelize(scoreList, 1)
val totalScores = scores.reduceByKey(_ + _)
totalScores.foreach(classScore => println(classScore._1 + ": " + classScore._2))
– join(otherDataset, [numTasks]) :在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集
val studentList = Array(
Tuple2(1