第59
课:使用
Java
和
Scala
在
IDE
中实战
RDD
和
DataFrame
转换操作
’学习笔记
本期内容:
1 RDD
与
DataFrame
转换的重大意义
2
使用
Java
实战
RDD
与
DaraFrame
转换
3
使用
Scala
实战
RDD
与
DataFrame
转换
一.
RDD
与
DataFrame
转换的重大意义
在
Spark
中
RDD
可以直接转换成
DataFrame
。
SparkCore
的核心是
RDD
,所有的调度都是基于
RDD
完成的,对
RDD
的操作都可以转换成基于
DataFrame
使用
SparkSQL
来操作。
RDD
可能接上数据库,接上
NoSQL
,其他文件系统等各种数据来源,然后将数据转换为
DataFrame
,
极大简化了大数据的开发,原来写
Scala\Java
,现在只需要写
SparkSQL
。
同时对
DataFrame
的操作又可以转换成
RDD
,基于
DataFrame
对数据进行
SQL
或机器学习等操作后又可以转换为
RDD
,这对于保存数据、格式化非常方便。
RDD
变
DataFrame
有两种方式:
1.通过反射,推断
RDD
元素中的元数据。
RDD
中的数据本身是没有元数据的,例如一个
Person
的信息里有
id/name/age
,
RDD
的
Record
不知道
id/name/age
这些信息,但如果变成
DataFrame
的话,
DataFrame
必须知道这些信息。如何在
RDD
和
DataFrame
转换时拥有这些元数据信息呢?最简单的就是通过反射。
在
Scala
中就是
Case Class
映射。写一个
Case Class
,描述
RDD
中不同列的元数据是什么。
在
Java
中就是通过
JavaBean
。
Scala
:
case class
映射。
Java
:
Bean
(但不能支持嵌套的
JavaBean
,也不能有
List/Map
等复杂的数据结构。只能用简单的数据类型:
String/Int
等。
Scala
就没有这些限制)
使用反射的前提:已经知道元数据信息了(静态的)。但有些场景下只有在运行时才能知道元数据信息(动态的)
2. 创建
DataFrame
时事先不知道元数据信息,只能在运行时动态构建元数据。然后再把这些元数据信息应用于
RDD
上。这种情况是比较常见的情况,即动态获取
Schema
。
class
Person
{
private
int
id
;
private
String
name
;
private
int
age
;
}
点击右键,选择source -> Generate Getters and Setters
选择
age
、
id
、
name
后点击
OK
。
即可自动生成
getter
和
setter
代码:
class
Person{
private
int
id
;
private
String
name
;
private
int
age
;
public
int
getId() {
return
id
;
}
public
void
setId(
int
id
) {
this
.
id
=
id
;
}
public
String getName() {
return
name
;
}
public
void
setName(String
name
) {
this
.
name
=
name
;
}
public
int
getAge() {
return
age
;
}
public
void
setAge(
int
age
) {
this
.
age
=
age
;
}
}
数据:
在
D:\DT-IMF\testdata
目录下创建
persons.txt
文件,内容如下:
1,Spark,7
2,Hadoop,11
3,Flink,5
下面是实战代码:
package
SparkSQLByJava;
//使用反射的方式将RDD转换成为DataFrame
import
java.util.List;
import
org.apache.spark.SparkConf;
import
org.apache.spark.api.java.JavaRDD;
import
org.apache.spark.api.java.JavaSparkContext;
import
org.apache.spark.api.java.function.Function;
import
org.apache.spark.sql.DataFrame;
import
org.apache.spark.sql.Row;
import
org.apache.spark.sql.SQLContext;
public
class
RDD2DataFrameByReflection {
public
static
void
main(String[]
args
) {
//创建SparkConf对象
SparkConf
conf
=
new
SparkConf().setMaster(
“local”
).setAppName(
“RDD2DataFrameByReflection”
);
//创建SparkContext对象
JavaSparkContext
sc
=
new
JavaSparkContext(
conf
);
//创建SQLContext上下文对象用于SQL分析
SQLContext
sqlContext
=
new
SQLContext(
sc
);
//创建RDD,读取textFile
JavaRDD<String>
lines
=
sc
.textFile(
“D://DT-IMF//testdata//persons.txt”
);
JavaRDD<Person>
persons
=
lines
.map(
new
Function<String, Person>()
{
@Override
public
Person call(String
line
)
throws
Exception {
String[]
splited
=
line
.split(
“,”
);
Person
p
=
new
Person();
p
.setId(Integer.
valueOf
(
splited
[0].trim()));
p
.setName(
splited
[1]);
p
.setAge(Integer.
valueOf
(
splited
[2].trim()));
return
p
;
}
});
/*
*reateDataFrame方法来自于sqlContext,有两个参数,第一个是RDD,这里就是lines.map之后的persons
*这个RDD里的类型是person,即每条记录都是person,person其实是有id,name,age的,
*JavaRDD本身并不知道id,name,age信息,所以要创建DataFrame,DataFrame需要知道id,name,age信息,
*DataFrame怎么知道的呢?这里用createDataFrame时传入两个参数,第一个的RDD本身,第二个参数是
*对RDD中每条数据的元数据的描述,这里就是java bean class,即person.class
*实际上工作原理是:person.class传入时本身会用反射的方式创建DataFrame,
*在底层通过反射的方式获得Person的所有fields,结合RDD本身,就生成了DataFrame
*/
DataFrame
df
=
sqlContext
.createDataFrame(
persons
, Person.
class
);
//将DataFrame变成一个TempTable。
df
.registerTempTable(
“persons”
);
//在内存中就会生成一个persons的表,在这张临时表上就可以写SQL语句了。
DataFrame
bigDatas
=
sqlContext
.sql(
“select * from persons where age >= 6”
);
//转过来就可以把查询后的结果变成 RDD。返回的是JavaRDD<Row>
//注意:这里需要导入org.apache.spark.sql.Row
JavaRDD<Row>
bigDataRDD
=
bigDatas
.javaRDD();
//再对RDD进行map操作。元素是一行一行的数据(SQL的Row),结果是Person,再次还原成Person。
//这里返回的是具体的每条RDD的元素。
JavaRDD<Person>
result
=
bigDataRDD
.map(
new
Function<Row, Person>()
{
@Override
public
Person call(Row
row
)
throws
Exception {
Person
p
=
new
Person();
//
p.setId(row.getInt(0));
//
p.setName(row.getString(1));
//
p.setAge(row.getInt(2));
//数据读进来时第一列是id,第二列是name,第三列是age,生成的RDD也是这个顺序,
//变成DataFrame后,DataFrame有自己的优化引擎,优化(数据结构优化等)之后再进行处理,
//处理后再变成RDD时就不能保证第一列是id,第二列是name,第三列是age了。
//原因是DataFrame对数据进行了排序。
p
.setId(
row
.getInt(1));
p
.setName(
row
.getString(2));
p
.setAge(
row
.getInt(0));
return
p
;
}
});
List<Person>
personList
=
result
.collect();
for
(Person
p
:
personList
){
System.
out
.println(
p
);
}
}
}
package
SparkSQLByJava;
import
java.io.Serializable;
public
class
Person
implements
Serializable {
/**
*
*/
private
static
final
long
serialVersionUID
= 1L;
public
int
id
;
public
String
name
;
public
int
age
;
public
int
getId() {
return
id
;
}
public
void
setId(
int
id
) {
this
.
id
=
id
;
}
public
String getName() {
return
name
;
}
public
void
setName(String
name
) {
this
.
name
=
name
;
}
public
int
getAge() {
return
age
;
}
public
void
setAge(
int
age
) {
this
.
age
=
age
;
}
@Override
public
String toString() {
return
“Person [id=”
+
id
+
“, name=”
+
name
+
“, age=”
+
age
+
“]”
;
}
}
下面是在
eclipse
中运行的
console
信息:
Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
16/03/29 01:24:54 INFO SparkContext: Running Spark version 1.6.0
16/03/29 01:24:57 INFO SecurityManager: Changing view acls to: think
16/03/29 01:24:57 INFO SecurityManager: Changing modify acls to: think
16/03/29 01:24:57 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(think); users with modify permissions: Set(think)
16/03/29 01:25:00 INFO Utils: Successfully started service ‘sparkDriver’ on port 56818.
16/03/29 01:25:01 INFO Slf4jLogger: Slf4jLogger started
16/03/29 01:25:02 INFO Remoting: Starting remoting
16/03/29 01:25:02 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:56831]
16/03/29 01:25:02 INFO Utils: Successfully started service ‘sparkDriverActorSystem’ on port 56831.
16/03/29 01:25:02 INFO SparkEnv: Registering MapOutputTracker
16/03/29 01:25:03 INFO SparkEnv: Registering BlockManagerMaster
16/03/29 01:25:03 INFO DiskBlockManager: Created local directory at C:\Users\think\AppData\Local\Temp\blockmgr-2307f4ac-3fca-4e65-ad95-99802c35dffc
16/03/29 01:25:03 INFO MemoryStore: MemoryStore started with capacity 1773.8 MB
16/03/29 01:25:03 INFO SparkEnv: Registering OutputCommitCoordinator
16/03/29 01:25:04 INFO Utils: Successfully started service ‘SparkUI’ on port 4040.
16/03/29 01:25:04 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040
16/03/29 01:25:05 INFO Executor: Starting executor ID driver on host localhost
16/03/29 01:25:06 INFO Utils: Successfully started service ‘org.apache.spark.network.netty.NettyBlockTransferService’ on port 56838.
16/03/29 01:25:06 INFO NettyBlockTransferService: Server created on 56838
16/03/29 01:25:06 INFO BlockManagerMaster: Trying to register BlockManager
16/03/29 01:25:06 INFO BlockManagerMasterEndpoint: Registering block manager localhost:56838 with 1773.8 MB RAM, BlockManagerId(driver, localhost, 56838)
16/03/29 01:25:06 INFO BlockManagerMaster: Registered BlockManager
16/03/29 01:25:10 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.4 KB, free 127.4 KB)
16/03/29 01:25:10 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 141.3 KB)
16/03/29 01:25:10 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:56838 (size: 13.9 KB, free: 1773.7 MB)
16/03/29 01:25:10 INFO SparkContext: Created broadcast 0 from textFile at RDD2DataFrameByReflection.java:24
16/03/29 01:25:17 WARN : Your hostname, think-PC resolves to a loopback/non-reachable address: fe80:0:0:0:d401:a5b5:2103:6d13%eth8, but we couldn’t find any external IP address!
16/03/29 01:25:18 INFO FileInputFormat: Total input paths to process : 1
16/03/29 01:25:18 INFO SparkContext: Starting job: collect at RDD2DataFrameByReflection.java:77
16/03/29 01:25:19 INFO DAGScheduler: Got job 0 (collect at RDD2DataFrameByReflection.java:77) with 1 output partitions
16/03/29 01:25:19 INFO DAGScheduler: Final stage: ResultStage 0 (collect at RDD2DataFrameByReflection.java:77)
16/03/29 01:25:19 INFO DAGScheduler: Parents of final stage: List()
16/03/29 01:25:19 INFO DAGScheduler: Missing parents: List()
16/03/29 01:25:19 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[7] at map at RDD2DataFrameByReflection.java:58), which has no missing parents
16/03/29 01:25:19 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 8.8 KB, free 150.1 KB)
16/03/29 01:25:19 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.6 KB, free 154.6 KB)
16/03/29 01:25:19 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:56838 (size: 4.6 KB, free: 1773.7 MB)
16/03/29 01:25:19 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/03/29 01:25:19 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[7] at map at RDD2DataFrameByReflection.java:58)
16/03/29 01:25:19 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/03/29 01:25:19 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2138 bytes)
16/03/29 01:25:19 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/03/29 01:25:19 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/persons.txt:0+33
16/03/29 01:25:19 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/03/29 01:25:19 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/03/29 01:25:19 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/03/29 01:25:19 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/03/29 01:25:19 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/03/29 01:25:20 INFO GeneratePredicate: Code generated in 479.023467 ms
16/03/29 01:25:20 INFO GenerateUnsafeProjection: Code generated in 97.650998 ms
16/03/29 01:25:20 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2443 bytes result sent to driver
16/03/29 01:25:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1507 ms on localhost (1/1)
16/03/29 01:25:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/03/29 01:25:20 INFO DAGScheduler: ResultStage 0 (collect at RDD2DataFrameByReflection.java:77) finished in 1.573 s
16/03/29 01:25:20 INFO DAGScheduler: Job 0 finished: collect at RDD2DataFrameByReflection.java:77, took 1.890426 s
Person [id=1, name=Spark, age=7]
Person [id=2, name=Hadoop, age=11]
16/03/29 01:25:20 INFO SparkContext: Invoking stop() from shutdown hook
16/03/29 01:25:21 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
16/03/29 01:25:21 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/03/29 01:25:21 INFO MemoryStore: MemoryStore cleared
16/03/29 01:25:21 INFO BlockManager: BlockManager stopped
16/03/29 01:25:21 INFO BlockManagerMaster: BlockManagerMaster stopped
16/03/29 01:25:21 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/03/29 01:25:21 INFO SparkContext: Successfully stopped SparkContext
16/03/29 01:25:21 INFO ShutdownHookManager: Shutdown hook called
16/03/29 01:25:21 INFO ShutdownHookManager: Deleting directory C:\Users\think\AppData\Local\Temp\spark-481db032-91d6-4ced-a94c-b38dd0b9033c
16/03/29 01:25:21 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/03/29 01:25:21 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
运行时报错解决1:
16/03/29 00:33:31 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalAccessException
: Class org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1 can not access a member of class SparkSQLByJava.Person with modifiers “public”
这是权限的问题,说明反射时需要类为public。
运行时报错解决2:
16/03/29 00:53:43 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException
: SparkSQLByJava.Person
Serialization stack:
– object not serializable (class: SparkSQLByJava.Person, value: SparkSQLByJava.Person@281cb092)
– element of array (index: 0)
– array (class [Ljava.lang.Object;, size 2)
原因是Person
类
没有序列化,需要改成public class Person implements Serializable
以上内容是王家林老师DT大数据梦工厂《 IMF传奇行动》第59课的学习笔记。
王家林老师是Spark、Flink、
Docker
、
Android
技术中国区布道师。Spark亚太研究院院长和首席专家,DT大数据梦工厂创始人,Android软硬整合源码级专家,英语发音魔术师,健身狂热爱好者。
微信公众账号:DT_Spark
联系邮箱
18610086859@126.com
电话:18610086859
QQ:1740415547
微信号:18610086859
新浪微博:ilovepains