1 给定数据如下:
班级ID 姓名 年龄 性别 科目 成绩
12 张三 25 男 chinese 50
12 张三 25 男 math 60
12 张三 25 男 english 70
12 李四 20 男 chinese 50
12 李四 20 男 math 50
12 李四 20 男 english 50
12 王芳 19 女 chinese 70
12 王芳 19 女 math 70
12 王芳 19 女 english 70
13 张大三 25 男 chinese 60
13 张大三 25 男 math 60
13 张大三 25 男 english 70
13 李大四 20 男 chinese 50
13 李大四 20 男 math 60
13 李大四 20 男 english 50
13 王小芳 19 女 chinese 70
13 王小芳 19 女 math 80
13 王小芳 19 女 english 70
2 需求及解答详见代码
package scala.day02
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author:SZX
* Date:2020/10/19 20:34
* Version:1.0
* Description:
*/
object exercise2 {
case class Person(classID: Int, name: String, age: Int, sex: String, keMu: String, score: Int)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkWordCount").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val rdd1: RDD[String] = sc.textFile("E:\\test\\student.txt")
val rdd: RDD[(String, String, String, String, String, String)] = rdd1.map(x => {
val line = x.split(" ");
(line(0), line(1), line(2), line(3), line(4), line(5))
})
// 1 一共有多少个小于20岁的人参加考试?
println(rdd.filter(_._3.toInt < 20).groupBy(_._2).count())
//2 一共有多个男生参加考试?
println(rdd.filter(_._4.equals("男")).groupBy(_._2).count())
//3 12班有多少人参加考试?
println(rdd.filter(_._1.toInt == 12).groupBy(_._2).count())
println(rdd.filter(_._1.toInt == 13).groupBy(_._2).count())
//4语文科目的平均成绩是多少?
val num: Long = rdd.filter(_._5.equals("chinese")).count()
val total: Int = rdd.filter(_._5.equals("chinese")).map(_._6.toInt).reduce(_ + _)
val average: Long = total / num
println(average)
//5 每个人个人平均成绩是多少?
val num2: Long = rdd.groupBy(_._5).count()
val average2: RDD[(String, Long)] = rdd.groupBy(_._2).mapValues(x => x.map(s => s._6.toInt).reduce(_ + _) / num2)
average2.foreach(println)
// 6 12班平均成绩是多少?
val num3: Long = rdd.filter(_._1.toInt == 12).groupBy(_._2).count()
val nums: Long = rdd.groupBy(_._5).count()
val average3: RDD[(String, Long)] = rdd.filter(_._1.toInt == 12).groupBy(_._1).mapValues(_.map(_._6.toInt).reduce(_ + _) / (num3 * nums))
average3.foreach(println)
//7 12班男生平均总成绩是多少?
val test: RDD[(String, Iterable[(String, String, String, String, String, String)])] = rdd.filter(_._1.toInt == 12).filter(_._4.equals("男")).groupBy(_._1)
val test2: RDD[(String, String, String, String, String, String)] = rdd.filter(_._1.toInt == 12).filter(_._4.equals("男"))
test2.foreach(println)
test.foreach(println)
//8 12班语文成绩最低分是多少?
val low: Array[String] = rdd.filter(_._1.equals("12")).filter(_._5.equals("chinese")).map(_._6).sortBy(_.toInt).take(1)
low.foreach(println)
//9 总成绩大于150分的12班的女生有几个?
val girlnum: Long = rdd.filter(_._1.toInt == 12).filter(_._4.equals("女")).groupBy(_._2).mapValues(_.map(_._6.toInt).reduce(_ + _)).map(_._2 > 150).count()
println(girlnum)
//10 总成绩大于150分,且数学大于等于70,且年龄大于等于19岁的学生的平均成绩是多少
val rdd2: RDD[(String, Int)] = rdd.filter(_._5.equals("math")).filter(_._6.toInt >= 70).filter(_._3.toInt >= 19).map(x => (x._2, 1))
val rdd3: RDD[(String, Int)] = rdd.map(x => {
(x._2, x._6.toInt)
})
val rdd5: RDD[(String, (Int, Int))] = rdd3.join(rdd2)
rdd5.foreach(println)
// (王小芳,(70,1))
// (王小芳,(80,1))
// (王小芳,(70,1))
// (王芳,(70,1))
// (王芳,(70,1))
// (王芳,(70,1))
val rdd6 = rdd5.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).filter(_._2._1 > 150).map(x => {
val a = (x._2._1 / x._2._2).toDouble
val d = a.formatted("%.2f")
(x._1, d)
})
rdd6.foreach(println)
}
}
版权声明:本文为m0_37650057原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。