 
   
创建方法
一、通过 toDF()函数创建
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD or Seq to a DataFrame. cannot be applied to Array import sqlContext.implicits._
1.1 从本地Seq创建
本地序列(seq)转为DataFrame要求数据的内容是指定的非Any数据类型,且各tupple相同位置的数据类型必须一致,否则会因为schema自动变为为Any而报错。
要求Seq的类型为scala.Product的子类型,如Tupple,例Seq[TuppleN[T1,T2,T3…]], Ti不为Any
val someDF = Seq(
  (8, "bat"),
  (64, "mouse"),
  (-27, "horse")
).toDF("number", "word")
调用 someDF.schema.printTreeString()得到someDF的schema:
root |-- number: integer (nullable = false) |-- word: string (nullable = true)
注意:如果直接用toDF()而不指定列名字,那么默认列名为”_1″, “_2”, …
Seq.toDF()的一个弊端是列类型和nullable标志无法被指定
1.2 通过rdd+toDF() 创建
类似Seq的创建,要求rdd为RDD[A<: scala.Product], 且A类型中类型不为Any。
直接调用rdd.toDF(“列1″,”列2”,…)
1.3 通过case class的RDD + toDF() 创建
这种方法的好处在于可以指定数据类型,示例:
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt")//得到rdd
                 .map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))//转为元素为Person的RDD
                 .toDF()//转换为Dataframe
二、通过 creatDataFrame()函数创建
主要好处在于可定制schema,包括nullable标志
creatDataFrame()共有7种重载方式:
其中
def createDataFrame[A<: scala.Product](rdd : RDD[A]):DataFrame def createDataFrame[A<: scala.Product](data : scala.Seq[A]): DataFrame
跟toDF一样,这里创建DataFrame的数据形态也可以是本地Seq或者RDD
其次
def createDataFrame(rowRDD : RDD[Row], schema : StructType) : DataFrame
多了一个StructType参数指定Schema,要求输入为RDD[Row], 示例:
val someData = Seq(
  Row(8, "bat"),
  Row(64, "mouse"),
  Row(-27, "horse")
)
val someSchema = List(
  StructField("number", IntegerType, true),
  StructField("word", StringType, true)
)
val someDF = spark.createDataFrame(
  spark.sparkContext.parallelize(someData),
  StructType(someSchema)
)
另外还有以下几种方法,少用,省略。
private[sql] def createDataFrame(rowRDD : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row], schema : StructType, needsConversion : scala.Boolean) : DataFrame def createDataFrame(rowRDD : JavaRDD[Row], schema : StructType) : DataFrame def createDataFrame(rdd : RDD[_], beanClass : scala.Predef.Class[_]) :DataFrame def createDataFrame(rdd : JavaRDD[_], beanClass : scala.Predef.Class[_]) :DataFrame
关于scala.product
    
     http://daily-scala.blogspot.com/2009/12/product.html
    
   
三、通过读取文件创建
使用parquet文件创建
val df = sqlContext.read.parquet("hdfs:/path/to/file")
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")//Load默认是parquet格式,通过format指定格式
val df = sqlContext.read.format("parquet").load("examples/src/main/resources/users.parquet")
使用json文件创建
val df = spark.read.json("examples/src/main/resources/people.json")//也有load版本
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
使用csv文件,spark2.0+之后的版本可用
//首先初始化一个SparkSession对象
val spark = org.apache.spark.sql.SparkSession.builder
        .master("local")
        .appName("Spark CSV Reader")
        .getOrCreate;
 
//然后使用SparkSessions对象加载CSV成为DataFrame
val df = spark.read
        .format("com.databricks.spark.csv")
        .option("header", "true") //reading the headers
        .option("mode", "DROPMALFORMED")
        .load("csv/file/path"); //.csv("csv/file/path") //spark 2.0 api
 
df.show()
    
    存储方法
   
df.write.save("namesAndFavColors.parquet")//默认parque文件存储
df.write.format("类型").save("Path")
储存文件类型转换
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
存储模式选择
不同的存储模式主要是处理已存在数据采用不同方法
df.write.format("类型").save("Path",SaveMode.Overwrite)  //可以把SaveMode.Overwrite换成其他的几种形式
| Scala/Java | Any Language | Meaning | 
|---|---|---|
| (default) | (default) | When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. | 
|  |  | When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data. | 
|  |  | Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame. | 
|  |  | Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a in SQL. | 
存储为持久性的Table
其实前面讲的df.write.save 也可以持久性的写到hdfs等存储介质当中,若果先创建了表格的话,可用如下语句
df.write.format("orc").save("hdfsPath") //
sqlContext.sql("alter table " + table+ " add if not exists partition(pt_d='" + pt_d + "')")//更新hiveMeta信息
//sqlContext.sql("msck repair table"+userClutsersTable)//更新hiveMeta信息
    Spark SQL支持对Hive中存储的数据进行读写。操作Hive中的数据时,必须创建HiveContext,而不是SQLContext。HiveContext继承自SQLContext,但是增加了在Hive元数据库中查找表,以及用HiveQL语法编写SQL的功能。除了sql()方法,HiveContext还提供了hql()方法,从而用Hive语法来编译sql。
    
    使用HiveContext,可以执行Hive的大部分功能,包括创建表、往表里导入数据以及用SQL语句查询表中的数据。查询出来的数据是一个Row数组。
   
当使用 HiveContext时,Datafame可用saveAsTable 存为持久性的table。与registerTempTable方法不同的是,saveAsTable将DataFrame中的内容持久化到表中,并在HiveMetastore中存储元数据。
    默认情况下,saveAsTable会创建一张Hive Managed Table,也就是说,数据的位置都是由元数据库中的信息控制的。当Managed Table被删除时,表中的数据也会一并被物理删除。
    
    registerTempTable只是注册一个临时的表,只要Spark Application重启或者停止了,那么表就没了。而saveAsTable创建的是物化的表,无论Spark Application重启或者停止,表都会一直存在。
    
    调用HiveContext.table()方法,还可以直接针对Hive中的表,创建一个DataFrame。
   
// 接着将dataframe中的数据保存到good_student_infos表中
hiveContext.sql("DROP TABLE IF EXISTS biads.good_student_infos")
goodStudentsDF.saveAsTable("biads.sgood_student_infos")
//可以用table()方法,针对hive表,直接创建DataFrame
 DF = hiveContext.table("good_student_infos")
这种方法存到默认位置,位置也可指定,见下:
df.write 为DataFrameWriter类,内有很多配置函数,如format,mode,options等,还有save,saveAsTable,json,parquet,orc等写入方法,存储位置可通过options指定:例:
val options = Map("path" -> "hdfs:/...")
df.write.format("orc").partitionBy("partitiondate").options(options).mode(SaveMode.Append).saveAsTable("tablename")
或
df.write.option("path", "hdfs:/...").format("parquet").saveAsTable("address_local") 
DataFrame也有saveAsTable(多重重载),有些提供了很多参数,和上述DataFrameWriter方法类似,例如参数最多的一个实现:
def saveAsTable(tableName : scala.Predef.String, source : scala.Predef.String, mode : org.apache.spark.sql.SaveMode, options : scala.Predef.Map[scala.Predef.String, scala.Predef.String]) : scala.Unit
但是这种使用方法似乎已经被废弃,应采用DataFrameWriter方法:
     
   
参考资料
    
     https://blog.csdn.net/martin_liang/article/details/79748503
    
   
    
     https://medium.com/@mrpowers/manually-creating-spark-dataframes-b14dae906393
    
   
    
     https://spark.apache.org/docs/1.5.1/sql-programming-guide.html
    
   
    
     http://spark.apache.org/docs/1.5.1/api/scala/index.html#org.apache.spark.sql.DataFrame
    
   
 
