第59课:使用Java和Scala在IDE中实战RDD和DataFrame转换操作’学习笔记

  • Post author:
  • Post category:java


第59

课:使用


Java





Scala





IDE


中实战


RDD





DataFrame


转换操作

’学习笔记

本期内容:

1  RDD




DataFrame


转换的重大意义

2

使用


Java


实战


RDD





DaraFrame


转换

3

使用


Scala


实战


RDD





DataFrame


转换

一.

RDD





DataFrame


转换的重大意义



Spark





RDD


可以直接转换成


DataFrame





SparkCore


的核心是


RDD


,所有的调度都是基于


RDD


完成的,对


RDD


的操作都可以转换成基于


DataFrame


使用


SparkSQL


来操作。


RDD


可能接上数据库,接上


NoSQL


,其他文件系统等各种数据来源,然后将数据转换为


DataFrame



极大简化了大数据的开发,原来写

Scala\Java


,现在只需要写


SparkSQL



同时对

DataFrame


的操作又可以转换成


RDD


,基于


DataFrame


对数据进行


SQL


或机器学习等操作后又可以转换为


RDD


,这对于保存数据、格式化非常方便。

RDD




DataFrame


有两种方式:

1.通过反射,推断

RDD


元素中的元数据。

RDD

中的数据本身是没有元数据的,例如一个


Person


的信息里有


id/name/age





RDD





Record


不知道


id/name/age


这些信息,但如果变成


DataFrame


的话,


DataFrame


必须知道这些信息。如何在


RDD





DataFrame


转换时拥有这些元数据信息呢?最简单的就是通过反射。



Scala


中就是


Case Class


映射。写一个


Case Class


,描述


RDD


中不同列的元数据是什么。



Java


中就是通过


JavaBean



Scala




case class


映射。

Java




Bean


(但不能支持嵌套的


JavaBean


,也不能有


List/Map


等复杂的数据结构。只能用简单的数据类型:


String/Int


等。


Scala


就没有这些限制)

使用反射的前提:已经知道元数据信息了(静态的)。但有些场景下只有在运行时才能知道元数据信息(动态的)

2. 创建

DataFrame


时事先不知道元数据信息,只能在运行时动态构建元数据。然后再把这些元数据信息应用于


RDD


上。这种情况是比较常见的情况,即动态获取


Schema





class



Person

{



private


int




id


;



private


String


name


;



private


int




age


;

}

点击右键,选择source -> Generate Getters and Setters

选择

age





id





name


后点击


OK



即可自动生成

getter





setter


代码:



class


Person{



private


int



id

;



private


String

name

;



private


int



age

;



public


int


getId() {



return



id

;

}



public


void


setId(


int



id

) {



this


.

id

=

id

;

}



public


String getName() {



return



name

;

}



public


void


setName(String

name

) {



this


.

name

=

name

;

}



public


int


getAge() {



return



age

;

}



public


void


setAge(


int



age

) {



this


.

age

=

age

;

}

}

数据:



D:\DT-IMF\testdata


目录下创建


persons.txt


文件,内容如下:

1,Spark,7

2,Hadoop,11

3,Flink,5

下面是实战代码:



package


SparkSQLByJava;


//使用反射的方式将RDD转换成为DataFrame



import


java.util.List;



import


org.apache.spark.SparkConf;



import


org.apache.spark.api.java.JavaRDD;



import


org.apache.spark.api.java.JavaSparkContext;



import


org.apache.spark.api.java.function.Function;



import


org.apache.spark.sql.DataFrame;



import


org.apache.spark.sql.Row;



import


org.apache.spark.sql.SQLContext;



public


class


RDD2DataFrameByReflection {



public


static


void


main(String[]

args

) {


//创建SparkConf对象

SparkConf

conf

=


new


SparkConf().setMaster(

“local”

).setAppName(

“RDD2DataFrameByReflection”

);


//创建SparkContext对象

JavaSparkContext

sc

=


new


JavaSparkContext(

conf

);


//创建SQLContext上下文对象用于SQL分析

SQLContext

sqlContext

=


new


SQLContext(

sc

);


//创建RDD,读取textFile

JavaRDD<String>

lines

=

sc

.textFile(

“D://DT-IMF//testdata//persons.txt”

);

JavaRDD<Person>

persons

=

lines

.map(


new



Function<String, Person>()

{


@Override



public


Person call(String

line

)


throws


Exception {

String[]

splited

=

line

.split(

“,”

);

Person

p

=


new


Person();


p

.setId(Integer.

valueOf

(

splited

[0].trim()));


p

.setName(

splited

[1]);


p

.setAge(Integer.

valueOf

(

splited

[2].trim()));



return



p

;

}

});


/*


*reateDataFrame方法来自于sqlContext,有两个参数,第一个是RDD,这里就是lines.map之后的persons


*这个RDD里的类型是person,即每条记录都是person,person其实是有id,name,age的,


*JavaRDD本身并不知道id,name,age信息,所以要创建DataFrame,DataFrame需要知道id,name,age信息,


*DataFrame怎么知道的呢?这里用createDataFrame时传入两个参数,第一个的RDD本身,第二个参数是


*对RDD中每条数据的元数据的描述,这里就是java bean class,即person.class


*实际上工作原理是:person.class传入时本身会用反射的方式创建DataFrame,


*在底层通过反射的方式获得Person的所有fields,结合RDD本身,就生成了DataFrame


*/

DataFrame

df

=

sqlContext

.createDataFrame(

persons

, Person.


class


);


//将DataFrame变成一个TempTable。


df

.registerTempTable(

“persons”

);


//在内存中就会生成一个persons的表,在这张临时表上就可以写SQL语句了。

DataFrame

bigDatas

=

sqlContext

.sql(

“select * from persons where age >= 6”

);


//转过来就可以把查询后的结果变成 RDD。返回的是JavaRDD<Row>


//注意:这里需要导入org.apache.spark.sql.Row

JavaRDD<Row>

bigDataRDD

=

bigDatas

.javaRDD();


//再对RDD进行map操作。元素是一行一行的数据(SQL的Row),结果是Person,再次还原成Person。


//这里返回的是具体的每条RDD的元素。

JavaRDD<Person>

result

=

bigDataRDD

.map(


new



Function<Row, Person>()

{


@Override



public


Person call(Row

row

)


throws


Exception {

Person

p

=


new


Person();


//


p.setId(row.getInt(0));


//


p.setName(row.getString(1));


//


p.setAge(row.getInt(2));


//数据读进来时第一列是id,第二列是name,第三列是age,生成的RDD也是这个顺序,


//变成DataFrame后,DataFrame有自己的优化引擎,优化(数据结构优化等)之后再进行处理,


//处理后再变成RDD时就不能保证第一列是id,第二列是name,第三列是age了。


//原因是DataFrame对数据进行了排序。


p

.setId(

row

.getInt(1));


p

.setName(

row

.getString(2));


p

.setAge(

row

.getInt(0));



return



p

;

}

});

List<Person>

personList

=

result

.collect();



for


(Person

p

:

personList

){

System.



out



.println(

p

);

}

}

}



package


SparkSQLByJava;



import


java.io.Serializable;



public


class


Person


implements


Serializable {


/**


*


*/



private


static


final


long



serialVersionUID



= 1L;



public


int



id

;



public


String

name

;



public


int



age

;



public


int


getId() {



return



id

;

}



public


void


setId(


int



id

) {



this


.

id

=

id

;

}



public


String getName() {



return



name

;

}



public


void


setName(String

name

) {



this


.

name

=

name

;

}



public


int


getAge() {



return



age

;

}



public


void


setAge(


int



age

) {



this


.

age

=

age

;

}


@Override



public


String toString() {



return



“Person [id=”

+

id

+

“, name=”

+

name

+

“, age=”

+

age

+

“]”

;

}

}

下面是在

eclipse


中运行的


console


信息:


Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties


16/03/29 01:24:54 INFO SparkContext: Running Spark version 1.6.0


16/03/29 01:24:57 INFO SecurityManager: Changing view acls to: think


16/03/29 01:24:57 INFO SecurityManager: Changing modify acls to: think


16/03/29 01:24:57 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(think); users with modify permissions: Set(think)


16/03/29 01:25:00 INFO Utils: Successfully started service ‘sparkDriver’ on port 56818.


16/03/29 01:25:01 INFO Slf4jLogger: Slf4jLogger started


16/03/29 01:25:02 INFO Remoting: Starting remoting


16/03/29 01:25:02 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:56831]


16/03/29 01:25:02 INFO Utils: Successfully started service ‘sparkDriverActorSystem’ on port 56831.


16/03/29 01:25:02 INFO SparkEnv: Registering MapOutputTracker


16/03/29 01:25:03 INFO SparkEnv: Registering BlockManagerMaster


16/03/29 01:25:03 INFO DiskBlockManager: Created local directory at C:\Users\think\AppData\Local\Temp\blockmgr-2307f4ac-3fca-4e65-ad95-99802c35dffc


16/03/29 01:25:03 INFO MemoryStore: MemoryStore started with capacity 1773.8 MB


16/03/29 01:25:03 INFO SparkEnv: Registering OutputCommitCoordinator


16/03/29 01:25:04 INFO Utils: Successfully started service ‘SparkUI’ on port 4040.


16/03/29 01:25:04 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040


16/03/29 01:25:05 INFO Executor: Starting executor ID driver on host localhost


16/03/29 01:25:06 INFO Utils: Successfully started service ‘org.apache.spark.network.netty.NettyBlockTransferService’ on port 56838.


16/03/29 01:25:06 INFO NettyBlockTransferService: Server created on 56838


16/03/29 01:25:06 INFO BlockManagerMaster: Trying to register BlockManager


16/03/29 01:25:06 INFO BlockManagerMasterEndpoint: Registering block manager localhost:56838 with 1773.8 MB RAM, BlockManagerId(driver, localhost, 56838)


16/03/29 01:25:06 INFO BlockManagerMaster: Registered BlockManager


16/03/29 01:25:10 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.4 KB, free 127.4 KB)


16/03/29 01:25:10 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 141.3 KB)


16/03/29 01:25:10 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:56838 (size: 13.9 KB, free: 1773.7 MB)


16/03/29 01:25:10 INFO SparkContext: Created broadcast 0 from textFile at RDD2DataFrameByReflection.java:24


16/03/29 01:25:17 WARN : Your hostname, think-PC resolves to a loopback/non-reachable address: fe80:0:0:0:d401:a5b5:2103:6d13%eth8, but we couldn’t find any external IP address!


16/03/29 01:25:18 INFO FileInputFormat: Total input paths to process : 1


16/03/29 01:25:18 INFO SparkContext: Starting job: collect at RDD2DataFrameByReflection.java:77


16/03/29 01:25:19 INFO DAGScheduler: Got job 0 (collect at RDD2DataFrameByReflection.java:77) with 1 output partitions


16/03/29 01:25:19 INFO DAGScheduler: Final stage: ResultStage 0 (collect at RDD2DataFrameByReflection.java:77)


16/03/29 01:25:19 INFO DAGScheduler: Parents of final stage: List()


16/03/29 01:25:19 INFO DAGScheduler: Missing parents: List()


16/03/29 01:25:19 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[7] at map at RDD2DataFrameByReflection.java:58), which has no missing parents


16/03/29 01:25:19 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 8.8 KB, free 150.1 KB)


16/03/29 01:25:19 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.6 KB, free 154.6 KB)


16/03/29 01:25:19 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:56838 (size: 4.6 KB, free: 1773.7 MB)


16/03/29 01:25:19 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006


16/03/29 01:25:19 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[7] at map at RDD2DataFrameByReflection.java:58)


16/03/29 01:25:19 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks


16/03/29 01:25:19 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2138 bytes)


16/03/29 01:25:19 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)


16/03/29 01:25:19 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/persons.txt:0+33


16/03/29 01:25:19 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id


16/03/29 01:25:19 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id


16/03/29 01:25:19 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap


16/03/29 01:25:19 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition


16/03/29 01:25:19 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id


16/03/29 01:25:20 INFO GeneratePredicate: Code generated in 479.023467 ms


16/03/29 01:25:20 INFO GenerateUnsafeProjection: Code generated in 97.650998 ms


16/03/29 01:25:20 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2443 bytes result sent to driver


16/03/29 01:25:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1507 ms on localhost (1/1)


16/03/29 01:25:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool


16/03/29 01:25:20 INFO DAGScheduler: ResultStage 0 (collect at RDD2DataFrameByReflection.java:77) finished in 1.573 s


16/03/29 01:25:20 INFO DAGScheduler: Job 0 finished: collect at RDD2DataFrameByReflection.java:77, took 1.890426 s

Person [id=1, name=Spark, age=7]

Person [id=2, name=Hadoop, age=11]


16/03/29 01:25:20 INFO SparkContext: Invoking stop() from shutdown hook


16/03/29 01:25:21 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040


16/03/29 01:25:21 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!


16/03/29 01:25:21 INFO MemoryStore: MemoryStore cleared


16/03/29 01:25:21 INFO BlockManager: BlockManager stopped


16/03/29 01:25:21 INFO BlockManagerMaster: BlockManagerMaster stopped


16/03/29 01:25:21 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!


16/03/29 01:25:21 INFO SparkContext: Successfully stopped SparkContext


16/03/29 01:25:21 INFO ShutdownHookManager: Shutdown hook called


16/03/29 01:25:21 INFO ShutdownHookManager: Deleting directory C:\Users\think\AppData\Local\Temp\spark-481db032-91d6-4ced-a94c-b38dd0b9033c


16/03/29 01:25:21 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.


16/03/29 01:25:21 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.


运行时报错解决1:


16/03/29 00:33:31 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)



java.lang.IllegalAccessException



: Class org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1 can not access a member of class SparkSQLByJava.Person with modifiers “public”

这是权限的问题,说明反射时需要类为public。

运行时报错解决2:


16/03/29 00:53:43 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)



java.io.NotSerializableException



: SparkSQLByJava.Person


Serialization stack:


– object not serializable (class: SparkSQLByJava.Person, value: SparkSQLByJava.Person@281cb092)


– element of array (index: 0)


– array (class [Ljava.lang.Object;, size 2)

原因是Person



没有序列化,需要改成public class Person implements Serializable



以上内容是王家林老师DT大数据梦工厂《 IMF传奇行动》第59课的学习笔记。

王家林老师是Spark、Flink、













Docker













Android

技术中国区布道师。Spark亚太研究院院长和首席专家,DT大数据梦工厂创始人,Android软硬整合源码级专家,英语发音魔术师,健身狂热爱好者。

微信公众账号:DT_Spark

联系邮箱

18610086859@126.com

电话:18610086859

QQ:1740415547

微信号:18610086859

新浪微博:ilovepains



版权声明:本文为slq1023原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。