【RDD】创建RDD及读取文件

  • Post author:
  • Post category:其他


最近项目中需要做的数据处理相对复杂,自己浅显的scala知识已经不够用了,打算每天来学习一点点。这里感谢yihan大佬在解决问题中给到的巨大帮助!感谢生命中遇到的每个贵人!




创建RDD





Spark shell提供了SparkContext变量

sc

,使用

sc.parallelize()

创建RDD。

scala> val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at
:24



使用scala时,从

SparkSession

中获取

SparkContext

对象,并使用

SparkContext .parallelize()

来创建rdd,这个函数还有另一个signature,它附加了一个整数参数来指定分区的数量。分区是Apache Spark中并行性的基本单位。Apache Spark中的RDD是分区的集合。(Partitions are basic units of parallelism in Apache Spark. RDDs in Apache Spark are a collection of partitions.)

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object RDDParallelize {

  def main(args: Array[String]): Unit = {
      val spark:SparkSession = SparkSession.builder().master("local[1]")
          .appName("SparkByExamples.com")
          .getOrCreate()
      val rdd:RDD[Int] = spark.sparkContext.parallelize(List(1,2,3,4,5))
      val rddCollect:Array[Int] = rdd.collect()
      println("Number of Partitions: "+rdd.getNumPartitions)
      println("Action: First element: "+rdd.first())
      println("Action: RDD converted to Array[Int] : ")
      rddCollect.foreach(println)
  }
}



以上代码的输出:

Number of Partitions: 1

Action: First element: 1

Action: RDD converted to Array[Int] :

1

2

3

4

5




创建空RDD


sparkContext.parallelize(Seq.empty[String])




读取文件





通过将目录路径输入到

textFile()

读取所有文本文件并创建单个RDD。如果发现一个Spark进程失败并报错,请确保没有嵌套目录。

val rdd = spark.sparkContext.textFile("C:/tmp/files/*")
rdd.foreach(f=>{
  println(f)
})



以上例子读取一个目录中的所有文件,创建一个RDD并打印RDD的内容



输出:

Invalid,I

One,1

Two,2

Three,3

Four,4



如果是在集群上运行,应该首先收集数据,以便在控制台上打印,如下所示。

rdd.collect.foreach(f=>{
println(f)
})




wholeTextFiles()用法


val rddWhole = spark.sparkContext.wholeTextFiles("C:/tmp/files/*")
  rddWhole.foreach(f=>{
    println(f._1+"=>"+f._2)
  })



以上代码中,使用了

wholeTextFiles()

方法,返回的是一个RDD[Tuple2],其中tuple中的第一个值(_1)是文件名,第二个值(_2)是文件内容



其输出如下:

file:/C:/tmp/files/invalid.txt=>Invalid,I

file:/C:/tmp/files/text01.txt=>One,1

file:/C:/tmp/files/text02.txt=>Two,2

file:/C:/tmp/files/text03.txt=>Three,3

file:/C:/tmp/files/text04.txt=>Four,4




读取多个text file在一个RDD中


val rdd3 = spark.sparkContext.textFile("C:/tmp/files/text01.txt,C:/tmp/files/text02.txt")
  rdd3.foreach(f=>{
    println(f)
  })



以上输出:

One,1

Two,2




读取所有text file在一个RDD中


val rdd2 = spark.sparkContext.textFile("C:/tmp/files/text*.txt")
rdd2.foreach(f=>{
  println(f)
})



输出如下:

One,1

Two,2

Three,3

Four,4




从多个目录读取文件到一个RDD中


val rdd2 = spark.sparkContext.textFile("C:/tmp/dir1/*,C:/tmp/dir2/*,c:/tmp/files/text01.txt")
rdd2.foreach(f=>{
  println(f)
})




在RDD中读取多个CSV文件


val rdd5 = spark.sparkContext.textFile("C:/tmp/files/*")
val rdd6 = rdd5.map(f=>{
  f.split(",")
})

rdd6.foreach(f => {
  println("Col1:"+f(0)+",Col2:"+f(1))
})



以上,将一个目录中的所有csv文件读入RDD,应用映射转换以逗号分隔记录,映射在转换后返回另一个RDD “rdd6″。最后,迭代rdd6,基于索引读取列。



注意:不能更新RDD,因为它们是不可变的(immutable)。以上代码产生如下输出:

Col1:Invalid,Col2:I

Col1:One,Col2:1

Col1:Two,Col2:2

Col1:Three,Col2:3

Col1:Four,Col2:4





注:以上有两种用法注意区别,一种是f._1,另一种是f(1) 。f.

_

1是从tuple中读取第一个值,f(0) 是从array中读取第一个元素。






跳过CSV文件的header



当CSV文件中有一个带有header行,并且要用Spark RDD读取和处理时,需要跳过这个header,因为在RDD中没有办法指定文件有一个header。

rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }




参考文献






Create a Spark RDD using Parallelize – Spark by {Examples}





Spark – Read multiple text files into single RDD? – Spark by {Examples}



Spark Load CSV File into RDD – Spark by {Examples}



版权声明:本文为yuxeaotao原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。