【Spark学习笔记】Dataframe操作大全

  • Post author:
  • Post category:其他


1. Dataframe的生成

1.1 通过 toDF()函数创建

Seq+toDF
val someDF = Seq(
  (8, "bat"),
  (64, "mouse"),
  (-27, "horse")
).toDF("number", "word")

注意:如果直接用toDF()而不指定列名字,那么默认列名为”_1″, “_2”, … Seq.toDF()的一个弊端是列类型和nullable标志无法被指定

case Class+rdd+toDF
case class Person(name: String, age: Int)
val people = sc.textFile("examples/src/main/resources/people.txt")//得到rdd
                 .map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))//转为元素为Person的RDD
                 .toDF()//转换为Dataframe

优点:可以指定数据类型

1.2 通过 creatDataFrame()函数创建

val someData = Seq(
  Row(8, "bat"),
  Row(64, "mouse"),
  Row(-27, "horse")
)

val someSchema = List(
  StructField("number", IntegerType, true),
  StructField("word", StringType, true)
)

val someDF = spark.createDataFrame(
  spark.sparkContext.parallelize(someData),
  StructType(someSchema)
)

跟toDF一样,这里创建DataFrame的数据形态也可以是本地Seq或者RDD,多了一个StructType参数指定Schema,要求输入为RDD[Row]。

1.3 通过读取文件创建

1.3.1读取parquet

val df = sqlContext.read.parquet("hdfs:/path/to/file")
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")//Load默认是parquet格式,通过format指定格式
val df = sqlContext.read.format("parquet").load("examples/src/main/resources/users.parquet")

1.3.2 读取json

val df = spark.read.json("examples/src/main/resources/people.json")//也有load版本
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
val df=spark.read.format("json").load("examples/src/main/resources/people.json")

1.3.3 读取CSV

val df = spark.read
        .format("com.databricks.spark.csv")
        .option("header", "true") //reading the headers
        .option("mode", "DROPMALFORMED")
        .load("csv/file/path"); //.csv("csv/file/path") //spark 2.0 api

2. Dataframe系列操作

2.1 对象的action


  • df.show



    df.show(10)

    显示10条


  • df.collect

    :获取所有数据到数组,返回

    Array

    对象,每一条记录由一个

    GenericRowWithSchema

    对象来表示,可以存储字段名及字段值。


  • df.collectAsList

    :与

    collect

    类似,返回

    List

    对象


  • df.describe(cols:String*)

    :获取指定字段的统计信息,结果仍然为

    DataFrame

    对象,用于统计数值类型字段的统计值,比如

    count, mean, stddev, min, max

    等。

    jdbcDF .describe("c1" , "c2", "c4" ).show()
    

image-20200701202522068.png


  • first, head, take, takeAsList

    :获取若干行记录

    ​ (1)

    first

    获取第一行记录

    (2)

    head

    获取第一行记录,

    head(n: Int)

    获取前n行记录

    (3)

    take(n: Int)

    获取前n行数据

    (4)

    takeAsList(n: Int)

    获取前n行数据,并以

    List

    的形式展现



    Row

    或者

    Array[Row]

    的形式返回一行或多行数据。

    first



    head

    功能相同。


    take



    takeAsList

    方法会将获得到的数据返回到Driver端,所以,使用这两个方法时需要注意数据量,以免Driver发生

    OutOfMemoryError

2.2 对象的条件查询和join等操作


  • df.where("id==1 or c1=='b'")

    :通过对列值进行筛选,用

    and



    or

    连接多个条件,返回

    dataframe


  • df.filter

    :与where类似


  • df.select("col1",”col2“)

    :查询指定字段,返回dataframe

    还有一个重载的

    select

    方法,不是传入

    String

    类型参数,而是传入

    Column

    类型参数。可以实现

    select id, id+1 from test

    这种逻辑。


    jdbcDF.select(jdbcDF( "id" ), jdbcDF( "id") + 1 ).show( false)

    [图片上传失败…(image-21fd3e-1594209882059)]



    selec

    类似功能,但得到

    Column

    类型对象的方法是

    apply

    以及

    col

    方法,一般用

    apply

    方法更简便。


  • selectExpr

    :可以对指定字段进行特殊处理

    可以直接对指定字段调用UDF函数,或者指定别名等。传入

    String

    类型参数,得到DataFrame对象。示例,查询

    id

    字段,

    c3

    字段取别名

    time



    c4

    字段四舍五入:


    jdbcDF .selectExpr("id" , "c3 as time" , "round(c4)" ).show(false)

image-20200701204138189.png


  • order by和sort

    :按指定字段排序,默认为升序。示例1,按指定字段排序。加个-表示降序排序。sort和orderBy使用方法相同

    jdbcDF.orderBy(- jdbcDF("c4")).show(false)
    // 或者
    jdbcDF.orderBy(jdbcDF("c4").desc).show(false)
    

  • sortWithinPartitions

    :和上面的

    sort

    方法功能类似,区别在于

    sortWithinPartitions

    方法返回的是按

    Partition

    排好序的

    DataFrame

    对象。


  • group by

    jdbcDF .groupBy("c1" )
    jdbcDF.groupBy( jdbcDF( "c1"))
    

    group by得到的是GroupedData对象,可接max()、min()、mean()、sum()、count()等聚合操作。也可接agg()、pivot()。

    jdbcDF.agg("id" -> "max", "c4" -> "sum")
    

  • union

    :unionAll方法,类似SQL


  • join

    :Dataframe提供了6种join方法。

    • 笛卡尔积:

      joinDF1.join(joinDF2)

    • joinDF1.join(joinDF2,"id")

      通过id来join

    • joinDF1.join(joinDF2, Seq("id", "name"))

      多个字段join

    • joinDF1.join(joinDF2, Seq("id", "name"), "inner")

      指定join类型 inner, outer, left_outer, right_outer, leftsemi

    • joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"))

    • joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"), "inner")
  • 获取指定字段统计信息

    stat方法可以用于计算指定字段或指定字段之间的统计信息,比如方差,协方差等。这个方法返回一个DataFramesStatFunctions类型对象。

    下面代码演示根据c4字段,统计该字段值出现频率在30%以上的内容。在jdbcDF中字段c1的内容为”a, b, a, c, d, b”。其中a和b出现的频率为2 / 6,大于0.3


    jdbcDF.stat.freqItems(Seq ("c1") , 0.3).show()

  • 获取两个DataFrame中共有的记录


intersect

方法可以计算出两个DataFrame中相同的记录,



jdbcDF.intersect(jdbcDF.limit(1)).show(false)

  • 获取一个df有另一个没有的记录


    jdbcDF.except(jdbcDF.limit(1)).show(false)

  • 操作字段名


    • withColumnRenamed

      :重命名DataFrame中的指定字段名。如果指定的字段名不存在,不进行任何操作。下面示例中将jdbcDF中的id字段重命名为idx。


    jdbcDF.withColumnRenamed( "id" , "idx" )


    • withColumn

      :往当前DataFrame中新增一列。whtiColumn(colName: String , col: Column)方法根据指定colName往DataFrame中新增一列,如果colName已存在,则会覆盖当前列。以下代码往jdbcDF中新增一个名为id2的列,


    jdbcDF.withColumn("id2", jdbcDF("id")).show( false)

  • 行转列

    有时候需要根据某个字段内容进行分割,然后生成多行,这时可以使用explode方法。下面代码中,根据c3字段中的空格将字段内容进行分割,分割的内容存储在新的字段c3_中,如下所示:


    jdbcDF.explode( "c3" , "c3_" ){time: String => time.split( " " )}

3. Dataframe存储

df.write.save("namesAndFavColors.parquet")//默认parque文件存储
df.write.format("类型")..mode(SaveMode.Overwrite).save("Path")


saveMode:error(default)|append|overwrite|ignore



版权声明:本文为ewen_lee原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。