RDD编程学习

  • Post author:
  • Post category:其他


RDD创建

1、使用sc.textFile(“文件的路径”)从文件系统中加载,sc是SparkContext

2、通过并行集合创建

val array = Array(1,2,3,4,5)

val rdd = sc.parallelize(array)//sc是SparkContext

RDD操作

转换得到的RDD是惰性操作,也就是说,整个转换( transformation)过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到动作(Action)操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作

常用的转换操作( transformation)

filter(func) 筛选出满足函数func的元素,并返回一个新的数据集

map(func) 将每个元素传递到函数func中,并将结果返回为一个新的数据集 一对一

flatMap(func) 与map()相似,但每个输入元素都可以映射到0或多个输出结果 可以一对多

groupByKey() 应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集

reduceByKey(func) 应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中每个值是将每个key传递到函数func中进行聚合后的结果

常用的动作操作(Action)

count() 返回数据集中的元素个数

collect() 以数组的形式返回数据集中的所有元素

first() 返回数据集中的第一个元素

take(n) 以数组的形式返回数据集中的前n个元素

reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素

foreach(func) 将数据集中的每个元素传递到函数func中运行

RDD持久化

RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的

persist()方法可以标记为持久化

persist(MEMORY_ONLY):表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容

persist(MEMORY_AND_DISK)表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上

一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)

可以使用unpersist()方法手动地把持久化的RDD从缓存中移除

RDD分区

为什么要分区 : 增加并行度,减少通信开销

RDD分区的一个原则是使得分区的个数尽量等于集群中的CPU核心(core)数目

对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目

一般而言:

本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N

Apache Mesos:默认的分区数为8

Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值

手动创建分区

sc.textFile(“文件的路径”,2) //2就是分区数

sc.parallelize(array,2)  //2就是分区数

repartition(1) //此方法可以重新定义分区

Pair RDD

键值对RDD 主要使用map()函数来实现

常用的对键值对RDD的转换操作

reduceByKey(func)

groupByKey()

keys

values

sortByKey()默认升序 sortByKey(false)降序

mapValues(func)

join 输入(K,V1)和(K,V2)输出 (K,(V1,V2))

combineByKey

共享变量

广播变量 Broadcast Variables

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value

res0: Array[Int] = Array(1, 2, 3)

累加器Accumulators

scala> val accum = sc.longAccumulator(“My Accumulator”)

accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))



10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value

res2: Long = 10

数据的读写

本地文件系统的数据读写

//读文件

scala> val  textFile = sc. textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)

//写文件

scala> textFile. saveAsTextFile(“file:///usr/local/spark/mycode/wordcount/writeback”)

读取hdfs上的文件

scala> val  textFile = sc.textFile(“hdfs://localhost:9000/user/hadoop/word.txt”)

scala> textFile.first()

//或者

scala> val textFile = sc.textFile(“/user/hadoop/word.txt”)

scala> val textFile = sc.textFile(“word.txt”)

//写文件

scala> textFile.saveAsTextFile(“writeback”)

读取json文件 并解析

val  jsonStr = sc.textFile(“file:///usr/local/spark/examples/src/main/resources/people.json”)

使用JSON.parseFull(jsonString:String)解析

部署到集群

进入到spark 安装目录

bin/spark-submit \

–class <main-class> \

–master <master-url> \

–deploy-mode <deploy-mode> \

–conf <key>=<value> \

… # other options

<application-jar> \

[application-arguments]

–class 入口类名(例如org.apache.spark.examples.SparkPi)

–master 群集的主URL(例如spark://23.195.26.187:7077)

–deploy-mode 是在工作节点(cluster)上部署驱动程序还是在本地部署外部客户端(client)(默认值: client)

–conf:key = value格式的任意Spark配置属性。对于包含空格的值,在引号中包含“key = value”

application-jar:捆绑jar的路径,包括您的应用程序和所有依赖项。URL必须在群集内部全局可见,例如,所有节点上都存在的hdfs://路径或file://路径。

application-arguments:main 方法的传入参数

———————

原文:https://blog.csdn.net/mingyunxiaohai/article/details/81298116