创建方法
一、通过 toDF()函数创建
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD or Seq to a DataFrame. cannot be applied to Array import sqlContext.implicits._
1.1 从本地Seq创建
本地序列(seq)转为DataFrame要求数据的内容是指定的非Any数据类型,且各tupple相同位置的数据类型必须一致,否则会因为schema自动变为为Any而报错。
要求Seq的类型为scala.Product的子类型,如Tupple,例Seq[TuppleN[T1,T2,T3…]], Ti不为Any
val someDF = Seq(
(8, "bat"),
(64, "mouse"),
(-27, "horse")
).toDF("number", "word")
调用 someDF.schema.printTreeString()得到someDF的schema:
root |-- number: integer (nullable = false) |-- word: string (nullable = true)
注意:如果直接用toDF()而不指定列名字,那么默认列名为”_1″, “_2”, …
Seq.toDF()的一个弊端是列类型和nullable标志无法被指定
1.2 通过rdd+toDF() 创建
类似Seq的创建,要求rdd为RDD[A<: scala.Product], 且A类型中类型不为Any。
直接调用rdd.toDF(“列1″,”列2”,…)
1.3 通过case class的RDD + toDF() 创建
这种方法的好处在于可以指定数据类型,示例:
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt")//得到rdd
.map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))//转为元素为Person的RDD
.toDF()//转换为Dataframe
二、通过 creatDataFrame()函数创建
主要好处在于可定制schema,包括nullable标志
creatDataFrame()共有7种重载方式:
其中
def createDataFrame[A<: scala.Product](rdd : RDD[A]):DataFrame def createDataFrame[A<: scala.Product](data : scala.Seq[A]): DataFrame
跟toDF一样,这里创建DataFrame的数据形态也可以是本地Seq或者RDD
其次
def createDataFrame(rowRDD : RDD[Row], schema : StructType) : DataFrame
多了一个StructType参数指定Schema,要求输入为RDD[Row], 示例:
val someData = Seq(
Row(8, "bat"),
Row(64, "mouse"),
Row(-27, "horse")
)
val someSchema = List(
StructField("number", IntegerType, true),
StructField("word", StringType, true)
)
val someDF = spark.createDataFrame(
spark.sparkContext.parallelize(someData),
StructType(someSchema)
)
另外还有以下几种方法,少用,省略。
private[sql] def createDataFrame(rowRDD : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row], schema : StructType, needsConversion : scala.Boolean) : DataFrame def createDataFrame(rowRDD : JavaRDD[Row], schema : StructType) : DataFrame def createDataFrame(rdd : RDD[_], beanClass : scala.Predef.Class[_]) :DataFrame def createDataFrame(rdd : JavaRDD[_], beanClass : scala.Predef.Class[_]) :DataFrame
关于scala.product
http://daily-scala.blogspot.com/2009/12/product.html
三、通过读取文件创建
使用parquet文件创建
val df = sqlContext.read.parquet("hdfs:/path/to/file")
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")//Load默认是parquet格式,通过format指定格式
val df = sqlContext.read.format("parquet").load("examples/src/main/resources/users.parquet")
使用json文件创建
val df = spark.read.json("examples/src/main/resources/people.json")//也有load版本
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
使用csv文件,spark2.0+之后的版本可用
//首先初始化一个SparkSession对象
val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark CSV Reader")
.getOrCreate;
//然后使用SparkSessions对象加载CSV成为DataFrame
val df = spark.read
.format("com.databricks.spark.csv")
.option("header", "true") //reading the headers
.option("mode", "DROPMALFORMED")
.load("csv/file/path"); //.csv("csv/file/path") //spark 2.0 api
df.show()
存储方法
df.write.save("namesAndFavColors.parquet")//默认parque文件存储
df.write.format("类型").save("Path")
储存文件类型转换
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
存储模式选择
不同的存储模式主要是处理已存在数据采用不同方法
df.write.format("类型").save("Path",SaveMode.Overwrite) //可以把SaveMode.Overwrite换成其他的几种形式
| Scala/Java | Any Language | Meaning |
|---|---|---|
(default) |
(default) |
When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. |
|
|
When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data. |
|
|
Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame. |
|
|
Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to ain SQL. |
存储为持久性的Table
其实前面讲的df.write.save 也可以持久性的写到hdfs等存储介质当中,若果先创建了表格的话,可用如下语句
df.write.format("orc").save("hdfsPath") //
sqlContext.sql("alter table " + table+ " add if not exists partition(pt_d='" + pt_d + "')")//更新hiveMeta信息
//sqlContext.sql("msck repair table"+userClutsersTable)//更新hiveMeta信息
Spark SQL支持对Hive中存储的数据进行读写。操作Hive中的数据时,必须创建HiveContext,而不是SQLContext。HiveContext继承自SQLContext,但是增加了在Hive元数据库中查找表,以及用HiveQL语法编写SQL的功能。除了sql()方法,HiveContext还提供了hql()方法,从而用Hive语法来编译sql。
使用HiveContext,可以执行Hive的大部分功能,包括创建表、往表里导入数据以及用SQL语句查询表中的数据。查询出来的数据是一个Row数组。
当使用 HiveContext时,Datafame可用saveAsTable 存为持久性的table。与registerTempTable方法不同的是,saveAsTable将DataFrame中的内容持久化到表中,并在HiveMetastore中存储元数据。
默认情况下,saveAsTable会创建一张Hive Managed Table,也就是说,数据的位置都是由元数据库中的信息控制的。当Managed Table被删除时,表中的数据也会一并被物理删除。
registerTempTable只是注册一个临时的表,只要Spark Application重启或者停止了,那么表就没了。而saveAsTable创建的是物化的表,无论Spark Application重启或者停止,表都会一直存在。
调用HiveContext.table()方法,还可以直接针对Hive中的表,创建一个DataFrame。
// 接着将dataframe中的数据保存到good_student_infos表中
hiveContext.sql("DROP TABLE IF EXISTS biads.good_student_infos")
goodStudentsDF.saveAsTable("biads.sgood_student_infos")
//可以用table()方法,针对hive表,直接创建DataFrame
DF = hiveContext.table("good_student_infos")
这种方法存到默认位置,位置也可指定,见下:
df.write 为DataFrameWriter类,内有很多配置函数,如format,mode,options等,还有save,saveAsTable,json,parquet,orc等写入方法,存储位置可通过options指定:例:
val options = Map("path" -> "hdfs:/...")
df.write.format("orc").partitionBy("partitiondate").options(options).mode(SaveMode.Append).saveAsTable("tablename")
或
df.write.option("path", "hdfs:/...").format("parquet").saveAsTable("address_local")
DataFrame也有saveAsTable(多重重载),有些提供了很多参数,和上述DataFrameWriter方法类似,例如参数最多的一个实现:
def saveAsTable(tableName : scala.Predef.String, source : scala.Predef.String, mode : org.apache.spark.sql.SaveMode, options : scala.Predef.Map[scala.Predef.String, scala.Predef.String]) : scala.Unit
但是这种使用方法似乎已经被废弃,应采用DataFrameWriter方法:
参考资料
https://blog.csdn.net/martin_liang/article/details/79748503
https://medium.com/@mrpowers/manually-creating-spark-dataframes-b14dae906393
https://spark.apache.org/docs/1.5.1/sql-programming-guide.html
http://spark.apache.org/docs/1.5.1/api/scala/index.html#org.apache.spark.sql.DataFrame