最近项目中需要做的数据处理相对复杂,自己浅显的scala知识已经不够用了,打算每天来学习一点点。这里感谢yihan大佬在解决问题中给到的巨大帮助!感谢生命中遇到的每个贵人!
创建RDD
Spark shell提供了SparkContext变量
sc
,使用
sc.parallelize()
创建RDD。
scala> val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at
:24
使用scala时,从
SparkSession
中获取
SparkContext
对象,并使用
SparkContext .parallelize()
来创建rdd,这个函数还有另一个signature,它附加了一个整数参数来指定分区的数量。分区是Apache Spark中并行性的基本单位。Apache Spark中的RDD是分区的集合。(Partitions are basic units of parallelism in Apache Spark. RDDs in Apache Spark are a collection of partitions.)
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object RDDParallelize {
def main(args: Array[String]): Unit = {
val spark:SparkSession = SparkSession.builder().master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
val rdd:RDD[Int] = spark.sparkContext.parallelize(List(1,2,3,4,5))
val rddCollect:Array[Int] = rdd.collect()
println("Number of Partitions: "+rdd.getNumPartitions)
println("Action: First element: "+rdd.first())
println("Action: RDD converted to Array[Int] : ")
rddCollect.foreach(println)
}
}
以上代码的输出:
Number of Partitions: 1
Action: First element: 1
Action: RDD converted to Array[Int] :
1
2
3
4
5
创建空RDD
sparkContext.parallelize(Seq.empty[String])
读取文件
通过将目录路径输入到
textFile()
读取所有文本文件并创建单个RDD。如果发现一个Spark进程失败并报错,请确保没有嵌套目录。
val rdd = spark.sparkContext.textFile("C:/tmp/files/*")
rdd.foreach(f=>{
println(f)
})
以上例子读取一个目录中的所有文件,创建一个RDD并打印RDD的内容
输出:
Invalid,I
One,1
Two,2
Three,3
Four,4
如果是在集群上运行,应该首先收集数据,以便在控制台上打印,如下所示。
rdd.collect.foreach(f=>{
println(f)
})
wholeTextFiles()用法
val rddWhole = spark.sparkContext.wholeTextFiles("C:/tmp/files/*")
rddWhole.foreach(f=>{
println(f._1+"=>"+f._2)
})
以上代码中,使用了
wholeTextFiles()
方法,返回的是一个RDD[Tuple2],其中tuple中的第一个值(_1)是文件名,第二个值(_2)是文件内容
其输出如下:
file:/C:/tmp/files/invalid.txt=>Invalid,I
file:/C:/tmp/files/text01.txt=>One,1
file:/C:/tmp/files/text02.txt=>Two,2
file:/C:/tmp/files/text03.txt=>Three,3
file:/C:/tmp/files/text04.txt=>Four,4
读取多个text file在一个RDD中
val rdd3 = spark.sparkContext.textFile("C:/tmp/files/text01.txt,C:/tmp/files/text02.txt")
rdd3.foreach(f=>{
println(f)
})
以上输出:
One,1
Two,2
读取所有text file在一个RDD中
val rdd2 = spark.sparkContext.textFile("C:/tmp/files/text*.txt")
rdd2.foreach(f=>{
println(f)
})
输出如下:
One,1
Two,2
Three,3
Four,4
从多个目录读取文件到一个RDD中
val rdd2 = spark.sparkContext.textFile("C:/tmp/dir1/*,C:/tmp/dir2/*,c:/tmp/files/text01.txt")
rdd2.foreach(f=>{
println(f)
})
在RDD中读取多个CSV文件
val rdd5 = spark.sparkContext.textFile("C:/tmp/files/*")
val rdd6 = rdd5.map(f=>{
f.split(",")
})
rdd6.foreach(f => {
println("Col1:"+f(0)+",Col2:"+f(1))
})
以上,将一个目录中的所有csv文件读入RDD,应用映射转换以逗号分隔记录,映射在转换后返回另一个RDD “rdd6″。最后,迭代rdd6,基于索引读取列。
注意:不能更新RDD,因为它们是不可变的(immutable)。以上代码产生如下输出:
Col1:Invalid,Col2:I
Col1:One,Col2:1
Col1:Two,Col2:2
Col1:Three,Col2:3
Col1:Four,Col2:4
注:以上有两种用法注意区别,一种是f._1,另一种是f(1) 。f.
_
1是从tuple中读取第一个值,f(0) 是从array中读取第一个元素。
跳过CSV文件的header
当CSV文件中有一个带有header行,并且要用Spark RDD读取和处理时,需要跳过这个header,因为在RDD中没有办法指定文件有一个header。
rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
参考文献
Create a Spark RDD using Parallelize – Spark by {Examples}
Spark – Read multiple text files into single RDD? – Spark by {Examples}
Spark Load CSV File into RDD – Spark by {Examples}