Spark电商项目实战(框架编程,三层架构,六个需求三个离线三个实时)

  • Post author:
  • Post category:其他






*电商项目实战*


项目使用scala编写,项目中使用的数据

下载链接


数据内容为电商的用户点击数据,以”-”分隔,部分数据示例:

2019-07-17_95_26070e87-1ad7-49a3-8fb3-cc741facaddf_37_2019-07-17 00:00:02_手机_-1_-1_null_null_null_null_3

2019-07-17_95_26070e87-1ad7-49a3-8fb3-cc741facaddf_48_2019-07-17 00:00:10_null_16_98_null_null_null_null_19

2019-07-17_95_26070e87-1ad7-49a3-8fb3-cc741facaddf_6_2019-07-17 00:00:17_null_19_85_null_null_null_null_7

2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_29_2019-07-17 00:00:19_null_12_36_null_null_null_null_5

2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_22_2019-07-17 00:00:28_null_-1_-1_null_null_15,1,20,6,4_15,88,75_9

2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_11_2019-07-17 00:00:29_苹果_-1_-1_null_null_null_null_7

2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_24_2019-07-17 00:00:38_null_-1_-1_15,13,5,11,8_99,2_null_null_10

2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_24_2019-07-17 00:00:48_null_19_44_null_null_null_null_4

2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_47_2019-07-17 00:00:54_null_14_79_null_null_null_null_2

2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_27_2019-07-17 00:00:59_null_3_50_null_null_null_null_26

2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_27_2019-07-17 00:01:05_i7_-1_-1_null_null_null_null_17

2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_24_2019-07-17 00:01:07_null_5_39_null_null_null_null_10

2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_25_2019-07-17 00:01:13_i7_-1_-1_null_null_null_null_24

2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_22_2019-07-17 00:01:21_null_19_62_null_null_null_null_20



*数据格式(日期_用户ID_SessionID_页面ID_时间戳_搜索关键字_点击品类ID_点击产品ID_订单品类iD_订单产品ID_支付品类ID_支付产品ID_城市ID)*

在写需求之前,先稍微写一个框架,把重复操作的内容抽象出来。

先导入相关的依赖jar包,别管用不用得到,之后肯定用得到,注意不要重了,之前wordCount导入了一部分。

 <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.17.Final</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>2.4.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>2.4.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>2.4.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>2.4.6</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.12.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.12.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>2.4.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.3.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>

项目采用三层架构(Dao-Service-Controller)

Scala下创建一个package,用于存放抽象框架,暂时命名为core,包下创建四个类,分别为TApplication,TController,TService,TDao.

其中Application就是程序的主入口,Controller为控制器,负责调用计算逻辑。Service中编写实际的计算逻辑,Dao用于数据交互,demo类也莫得数据库Dao基本上是闲置的。

TApplication->将重复的打开、关闭spark链接的操作抽象出来,通过start函数实现,把实际业务抽象为op。

trait TApplication {
  var envData : Any = null;
  //创建链接
  def start(t:String = "jdbc")(op : =>Unit) : Unit={
    if(t=="spark"){
      envData = EnvUtil.getEnv()
    }
    //业务抽象
    try{
      op
    }catch {
      case ex: Exception=> println(ex.getMessage)
    }
    //关闭链接
    if(t=="spark"){
      EnvUtil.clear()
    }
  }
}

TController->里面只有一个excute抽象函数,Controller继承后重写这个函数。

trait TController {  def excute():Unit }

TService->只有一个analtsis函数,重写这个函数编写实际的数据处理逻辑。

trait TService {

  /**
   * 数据分析
   * @return
   */
  def analysis():Any
}

TDao->readFile是每个需求都会用到的方法,放在抽象类里,如果还有其他需求则继承TDao之后再进行编写。

trait TDao {
  def readFile(path:String):RDD[String] = {
    EnvUtil.getEnv().textFile(path)
  }
}





*需求1.统计最受欢迎的品类,先排序点击-再订单-最后支付*


热门品类前十:HotCateGory





*HotCateGoryController*


->没什么好说的,就是个计算逻辑入口,控制器而已。

 private val hotCateGoryService = new HotCateGoryService
  override def excute(): Unit = {
    val result = hotCateGoryService.analysis()
    result.foreach(println)
  }
}





*HotCateGoryService*


->逻辑功能,思路就是分别对点击、订单、支付三种数据进行统计求和,以品类ID为key,以点击/订单/支付为value生成RDD,最终将三个数据合并扁平化处理。排序取前十位,就是个稍微复杂点的wordcount。

/**
 * 数据分析
 * 数据格式(日期_用户ID_SessionID_页面ID_时间戳_搜索关键字_点击品类ID_点击产品ID_订单品类iD_订单产品ID_支付品类ID_支付产品ID_城市ID)
 */
  private val hotCateGoryDao = new HotCateGoryDao()
override def analysis(): Array[(String,(Int,Int,Int))] = {

  //TODO 读取电商日志
  val actionRDD: RDD[String] = hotCateGoryDao.readFile("input/user_visit_action.txt")
  //数据缓存,防止重复读取文件资源浪费
  actionRDD.cache()
  //TODO 对点击统计
  val clickRDD = actionRDD.map(action => {
    val datas = action.split("_")
    //数据组的第6个数据是指品类
    (datas(6),1)
  }).filter(_._1 != "-1")//过滤掉-1无效点击
  val cateGoryIdToClickCountRDD: RDD[(String,Int)] = clickRDD.reduceByKey(_+_)
  //TODO 对下单统计
  /**
   * 订单中可以有多个商品(品类,订单次数)就会变成 ↓
   * (品类1,品类2,品类3,1)
   */
  val orderRDD = actionRDD.map(action => {
    val datas = action.split("_")
    //数据组的第8个数据是指订单
    (datas(8))
  }).filter(_ != "null")//过滤掉null无效订单
  //扁平化(品类1,品类2,品类3,1)->(品类1,1)(品类2,1)(品类3,1)
  val orderToOneRDD = orderRDD.flatMap{
    id => {
      val ids = id.split(",")
      ids.map(id => (id,1))
    }
  }
  val cateGoryIdToOrderCountRDD: RDD[(String,Int)] = orderToOneRDD.reduceByKey(_ + _)
  //TODO 对付款统计
  val payRDD = actionRDD.map(action => {
    val datas = action.split("_")
    //数据组的第10个数据是指付款
    (datas(10))
  }).filter(_ != "null")//过滤掉null无效支付

  val payToOneRDD = payRDD.flatMap{
    id => {
      val ids = id.split(",")
      ids.map(id => (id,1))
    }
  }
  val cateGoryIdToPayCountRDD: RDD[(String,Int)] = payToOneRDD.reduceByKey(_ + _)

  //TODO 把上面的三种RDD合并(品类,点击数,订单数,支付数)
  /**
   * join频繁使用笛卡尔积效率极低
   * 换用reduce,reduce函数必须保证合并的类型相同
   * 既两个(String,Int)不能合并成(String,(Int,Int,Int))
   * 先将类型转换
   * (品类,点击次数)->(品类,点击次数,0,0)
   * (品类,订单次数)->(品类,0,订单次数,0)
   * (品类,支付次数)->(品类,0,0,支付次数)
   *  再直接合并
   */

  val newClickRDD = cateGoryIdToClickCountRDD.map{
    case (id,click)=>{
      (id,(click,0,0))
    }
  }
  val newOrderRDD = cateGoryIdToOrderCountRDD.map{
    case (id,order)=>{
      (id,(0,order,0))
    }
  }
  val newPayRDD = cateGoryIdToPayCountRDD.map{
    case (id,pay)=>{
      (id,(0,0,pay))
    }
  }
  //先简单的加到一起
  val countRDD = newClickRDD.union(newOrderRDD).union(newPayRDD)
  val reduceRDD = countRDD.reduceByKey(
    (t1,t2) =>{
      (t1._1+t2._1,t1._2+t2._2,t1._3+t2._3)
    }
  )
  //TODO 对结果排序
  val sortRDD: RDD[(String,(Int,Int,Int))] = reduceRDD.sortBy(_._2, false)
  //TODO 取前十
  val result = sortRDD.take(10)
  result
}





*HotCateGoryDao*


->空函数,就是为了继承一下TDao,通过路径生成spark链接。

class HotCateGoryDao extends TDao{

}





*HotCateGoryApplication*


->程序启动入口,调用Controller。

object HotCateGoryApplication extends App with TApplication{
//TODO 获取热门品类前十,就是个稍微复杂点的wordcount
  start("spark"){
    val controller = new HotCateGoryController
    controller.excute()
  }
}





*结果展示*


执行结果,处于时间考虑,yarn集群上运行龟速计算,直接开发环境省事了。

在这里插入图片描述





*需求2.统计页面跳转率*


例:首页-详情跳转数/首页总点击数

页面跳转率:PageFlow





*PageFlowController*


->也就是个控制器而已。

class PageFlowController extends TController{
  private val pageFlowService = new PageFlowService
  override def excute(): Unit = {
    val result = pageFlowService.analysis()
    result
  }
}





*PageFlowService*


->跳转率计算的核心逻辑,把分子分母分开计算,先统计每个页面的总跳转数,没啥难度就是个wordcount。然后按页面跳转方向统计跳转次数,首先根据sessionID统计每个用户的总点击流,根据时间戳排序,然后将页面的点击流转化为页面跳转。

例:(1,2,3,4)->((1,2),1) ((2,3),1) ((3,4),1)

转化的方法是先将点击流头部去掉,然后两个map错位相连。

1 2 3 4
2 3 4

然后再去除session,一个没用的数据罢了。

最后再以(来源页面-跳转页面)为key,点击数为value做统计,得到分子。

最后使用分子/分母就是页面跳转率,其中分母需要将分子的key进行split(“-”)切分,只获取对应的来源页面的点击数。最后做一个百分数处理,保留两位小数。

class PageFlowService extends TService{
  /**
   * 数据分析
   * 页面跳转率=页面跳转方向/页面总点击率
   * @return
   */
  private  val pageFlowDao = new PageFlowDao
  override def analysis(): Any = {
    val actionRDD:RDD[UserVisitAction] = pageFlowDao.getUserVisitAction("input/user_visit_action.txt")
    //TODO 先算分母页面总点击率,就是个简单的wordcount
    val pageToOneRDD =actionRDD.map(
      action => {
        (action.page_id,1)
      }
    )
    val sumRDD = pageToOneRDD.reduceByKey(_+_)
    val pageCountArray = sumRDD.collect()

    //TODO 分子计算
    /**
     * 页面跳转方向计算比较复杂,要计算跳转方向
     * 比如3->5和4->5就是不同的页面跳转方向,还要计算时间,不同的时间跳转不算同一个操作
     * 不同用户跳转不算同一操作
     */

    val sessionRDD = actionRDD.groupBy(_.session_id)
    //pageFlowRDD类型:(session,List("来源页面-当前页面",跳转次数))
    val pageFlowRDD:RDD[(String,List[(String,Int)])] = sessionRDD.mapValues(
      iter => {
        //自定义排序,按时间戳排序
        val actions = iter.toList.sortWith(
          (left, right) => {
            left.action_time < right.action_time
          }
        )
        //将排序后的数据进行结构的转换,只需要pageid
        val pageIds = actions.map(_.page_id)
        /**
         * 格式转换(页面1,页面2,页面3)->((来源页面,当前页面),跳转次数)
         * (1,2,3,4)->((1,2),1)((2,3),1)((3,4),1)
         * 使用zip命令错位相连
         * 1,2,3,4.tail=2,3,4
         * 1,2,3,4(2,3,4)->(1,2)(2-3)(3-4)
         */
        val zipIds = pageIds.zip(pageIds.tail)

         zipIds.map {
         case (pageid1, pageid2) => {
            (pageid1 + "-" + pageid2, 1)
         }
       }

      }
    )

    //TODO 将分组后的数据进行结构转换,不需要session
    //把RDD[List(String,Int)]转换成RDD[(String,Int)]
    val value = pageFlowRDD.map(_._2).flatMap(list=>list)
    //这里统计到了(“2-3”,1),只需要合并统计即可得到页面跳转转换率
    val pageSumRDD = value.reduceByKey(_ + _)

    //TODO 计算转换率
    /**
     * 分子除分母即可,要做一下对应
     * (1-2的跳转数)/2的页面点击数
     * 页面点击数的格式List(页面ID,点击数),转换成map,根据页面号取value
     */
    pageSumRDD.foreach{
          //sum就是跳转数
      case (pageflow,sum) =>{
        val pageid = pageflow.split("-")(0)
        //获取对应页面总点击数
        val pageValue = pageCountArray.toMap.getOrElse(pageid.toLong,1)
        println("页面跳转【"+pageflow+"】转换率为"+(sum.toDouble/pageValue*100).formatted("%.2f")+"%")
      }
    }
  }
}





*PageFlowDao*


->值得一提的是这次Dao层终于有作用了,用于将看不懂的数据封装成类。

class PageFlowDao extends TDao{
  def getUserVisitAction(path:String) = {
    val rdd: RDD[String] = readFile(path)
    rdd.map(
      line=>{
        val datas = line.split("_")
        UserVisitAction(
          datas(0),
          datas(1).toLong,
          datas(2),
          datas(3).toLong,
          datas(4),
          datas(5),
          datas(6).toLong,
          datas(7).toLong,
          datas(8),
          datas(9),
          datas(10),
          datas(11),
          datas(12).toLong
        )
      }
    )
  }
}





*UserVisitAction*


->就是Dao使用到的bean类。

package object bean {
  case class UserVisitAction(
    date: String,//用户点击行为的日期
    user_id: Long,//用户的ID
    session_id: String,//Session的ID
    page_id: Long,//某个页面的ID
    action_time: String,//动作的时间点
    search_keyword: String,//用户搜索的关键词
    click_category_id: Long,//某一个商品品类的ID
    click_product_id: Long,//某一个商品的ID
    order_category_ids: String,//一次订单中所有品类的ID集合
    order_product_ids: String,//一次订单中所有商品的ID集合
    pay_category_ids: String,//一次支付中所有品类的ID集合
    pay_product_ids: String,//一次支付中所有商品的ID集合
    city_id: Long//城市 id
  )
}





*PageFlowApplication*


->最后在PageFlowApplication里调用Controller即可

object PageFlowApplication extends App with TApplication{
  start("spark"){
    val controller = new PageFlowController
    controller.excute()
  }
}





*结果展示*


看一下结果吧,太多了我就截一部分,大多数跳转率都是在2%左右

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lfXJLPwr-1597995575371)(file:///C:\Users\1234\AppData\Local\Temp\ksohtml12372\wps56.jpg)]





*需求3.不同区域内的热门商品Top3*






*数据准备*


->在hive中创建表,将已经准备好的数据导入hive

CREATE DATABASE spark_sql;
CREATE TABLE `user_visit_action`(
  `date` string,
  `user_id` bigint,
  `session_id` string,
  `page_id` bigint,
  `action_time` string,
  `search_keyword` string,
  `click_category_id` bigint,
  `click_product_id` bigint,
  `order_category_ids` string,
  `order_product_ids` string,
  `pay_category_ids` string,
  `pay_product_ids` string,
  `city_id` bigint)
row format delimited fields terminated by '\t';
load data local inpath '/root/data/user_visit_action.txt' into table user_visit_action;

CREATE TABLE `product_info`(
  `product_id` bigint,
  `product_name` string,
  `extend_info` string)
row format delimited fields terminated by '\t';
load data local inpath '/root/data/product_info.txt' into table product_info;

CREATE TABLE `city_info`(
  `city_id` bigint,
  `city_name` string,
  `area` string)
row format delimited fields terminated by '\t';
load data local inpath '/root/data/city_info.txt' into table city_info;

在这里插入图片描述





*SaprkSQL链接hive*


->首先把hive-site.xml从虚拟机里搞出来放在resources目录下

在这里插入图片描述

<configuration>
 <property>
    <name>javax.jdo.option.ConnectionURL</name>
   <value>jdbc:mysql://192.168.15.101:3306/hive?createDatabaseIfNotExist=true</value>
 </property>
 <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
   <value>com.mysql.jdbc.Driver</value>
 </property>
 <property>
    <name>javax.jdo.option.ConnectionUserName</name>
   <value>root</value>
 </property>
 <property>
    <name>javax.jdo.option.ConnectionPassword</name>
   <value>root</value>
 </property>
</configuration>

写个链接demo测试下能不能链接到hive

object SparkSQLHive {
  def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","root")
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQLLS")
val spark = new sql.SparkSession.Builder()
  .enableHiveSupport()
  .config(sparkConf)
  .getOrCreate()
    spark.sql("show databases").show
    spark.stop
  }
}


#大坑

因为要使用idea对hive进行远程操作,所以必须启动hive

的两个远程访问进程。不然会一直报错

Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStor

hive –service metastore &端口号默认是9083

hive –service hiveserver &端口号默认是10000

在这里插入图片描述

链接成功,红字是SLF4J的日志,不是报错。

尝试使用scala代码对hive进行建表/导入操作。先在hive中把之前创建的表删掉。数据库就保留着吧,创库有手就行就不演示了。

drop table city_info;

drop table user_visit_action;

drop table product_info;





*SparkSQL进行hive建表导入数据*


->然后使用sparksql对hive进行操作,其中元数据放在了项目根目录下的/input文件夹中。

def main(args: Array[String]): Unit = {
  System.setProperty("HADOOP_USER_NAME","root")
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQLLS")
  val spark = new sql.SparkSession.Builder()
    .enableHiveSupport()
    .config(sparkConf)
    .getOrCreate()
  import spark.implicits._
  spark.sql("show databases").show
  //TODO 根据当前/input下的数据创建hive表
  spark.sql("use spark_sql")
/创建user_visit_action表
  spark.sql(
    """
      |CREATE TABLE `user_visit_action`(
      |  `date` string,
      |  `user_id` bigint,
      |  `session_id` string,
      |  `page_id` bigint,
      |  `action_time` string,
      |  `search_keyword` string,
      |  `click_category_id` bigint,
      |  `click_product_id` bigint,
      |  `order_category_ids` string,
      |  `order_product_ids` string,
      |  `pay_category_ids` string,
      |  `pay_product_ids` string,
      |  `city_id` bigint)
      |row format delimited fields terminated by '\t'
      |""".stripMargin)
//为user_visit_action导入数据,注意要指定库名
  spark.sql(
    """
      |load data local inpath 'input/user_visit_action.txt' into table spark_sql.user_visit_action
      |""".stripMargin)
//创建product_info表
  spark.sql(
    """
      |CREATE TABLE `product_info`(
      |  `product_id` bigint,
      |  `product_name` string,
      |  `extend_info` string)
      |row format delimited fields terminated by '\t'
      |""".stripMargin)
//为product_info导入数据
  spark.sql(
    """
      |load data local inpath 'input/product_info.txt' into table spark_sql.product_info
      |""".stripMargin)
//创建city_info表
  spark.sql(
    """
      |CREATE TABLE `city_info`(
      |  `city_id` bigint,
      |  `city_name` string,
      |  `area` string)
      |row format delimited fields terminated by '\t'
      |""".stripMargin)
//为city_info导入数据
  spark.sql(
    """
      |load data local inpath 'input/city_info.txt' into table spark_sql.city_info
      |""".stripMargin)
//随便查询一个表测试是否成功,数据过多容易卡只显示前10条
  spark.sql(
    """
      |select * from city_info
      |""".stripMargin).show(10)
  spark.stop
}

查看结果,创建成功。

在这里插入图片描述





*SparkSQL计算逻辑*


->需要的结果为“地区,商品名,点击量,(城市1:点击率,城市2:点击率,其他:点击率)”

首先需要将三个数据表根据相应的主键外键链接起来,直接join即可生成临时表t1。

然后根据地区和商品名进行分组,统计区域内该商品的总点击量,统计区域内每个城市贡献的点击率,生成临时表t2。

最后对t2表进行开窗查询,对每个地区 的商品分别进行排序,取前三名,并统计排名。

展示结果数据。

object SparkSQLReq {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","root")
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQLLS")
    val spark = new sql.SparkSession.Builder()
      .enableHiveSupport()
      .config(sparkConf)
      .getOrCreate()
    import spark.implicits._
    //TODO 写sql完成统计,复杂的聚合函数需要自定义
    spark.sql("use spark_sql")

    /**
     * t1表就是简单的把三个表根据城市id和商品id,join起来
     * t2是在t1的基础上根据地区和商品进行分组,然后统计总点击量
     * t3使用开窗函数over把每个地区的点击量desc排序并且输出一个rank排名值
     * 最后select这个t3的结果进行展示,只取前三名
     */
    //TODO 先把三个表根据各自的主键外键拼接起来,去除掉click=-1的数据(无效点击)
    spark.sql(
      """
        |select a.*,c.*,p.product_name
        |from user_visit_action a
        |join city_info c on c.city_id = a.city_id
        |join product_info p on p.product_id = a.click_product_id
        |where a.click_product_id > -1
        |""".stripMargin).createOrReplaceTempView("t1")
    //TODO 把自定义的聚合函数注册一下,根据地区和商品名进行分组,统计每个商品的地区总点击量和地区内城市点击率
    //因为这里是要用sql进行聚合,所以必须使用弱类型自定义聚合函数
    //强类型聚合函数是使用的DataSet进行聚合,而不是sql
    val udaf = new CityRemarkUDAF
    spark.udf.register("cityRemark",udaf)
    spark.sql(
      """
        |select area,product_name,count(*) as click_count,cityRemark(city_name)
        |from  t1
        |group by area,product_name
        |""".stripMargin).createOrReplaceTempView("t2")

    //TODO 对生成的t2表进行开窗查询,统计每个地区内点击量前三的商品
    spark.sql(
      """
        |select *,rank() over(partition by area order by click_count desc) as rank
        |from t2
        |""".stripMargin).createOrReplaceTempView("t3")

    //TODO 查询结果展示
    spark.sql("select * from t3 where rank <= 3").show


    spark.stop()
  }
  class CityRemarkUDAF extends UserDefinedAggregateFunction{

    /**
     * 输入城市名,根据城市名统计城市的点击量,除以总点击量获得点击率
     * 最后返回一个字符串"城市1:点击率,城市2:点击率,其他:点击率"
     */
      //输入类型,城市名String
    override def inputSchema: StructType = {
      StructType(Array(StructField("cityName",StringType)))
    }
    //缓冲区数据类型,总点击量和城市点击量,城市有多个所以用map比较方便
    //例如地区1内包含城市1,城市2,则生成的map为
    //Map(城市1,点击量)
    //Map(城市2,点击量)
    override def bufferSchema: StructType = {
      StructType(Array(
        StructField("total",LongType),
        StructField("cityMap",MapType(StringType,LongType))
      ))
    }

    //返回的数据类型,就返回一个String类型的备注"城市:点击率"
    override def dataType: DataType = StringType

    override def deterministic: Boolean = true

    //缓冲区的初始化,根据前面设计的缓冲区类型来
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
      //初始总点击量为零
      buffer(0) = 0L
      //初始map为空
      buffer(1) = Map[String ,Long]()
    }

    //更新缓冲区内容
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      //输入的内容就是city_name,直接getString获取
      val cityName = input.getString(0)
      //每进来一个数据就让总点击量+1
      buffer(0) = buffer.getLong(0) + 1
      //获取缓冲区内的Map
      val citymap = buffer.getAs[Map[String,Long]](1)
      //获取Map内城市,点击量的值,如果没有城市则生成一个默认值为0的map,点击量+1
      val newClick = citymap.getOrElse(cityName,0L)+1
      //更新Map的值为新点击量
      buffer(1) = citymap.updated(cityName,newClick)
    }

    //合并缓冲区的内容
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      //第一个就是总点击量没什么好说的,直接加就完事
      buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
      //获取两个缓冲区的Map
      val map1 = buffer1.getAs[Map[String,Long]](1)
      val map2 = buffer2.getAs[Map[String,Long]](1)
      //合并Map的逻辑
      buffer1(1) = map1.foldLeft(map2){
        //输入值为map1,map2.key,map2.value
        case (map,(k,v)) =>{
          //更新map1,更新的值为map1的值(默认为0)加上map2.value
          map.updated(k,map.getOrElse(k,0L) + v)
        }
      }
    }

    //聚合函数返回值的计算逻辑
    override def evaluate(buffer: Row): Any = {
      //先获取这个商品/地区的总点击量
      val total = buffer.getLong(0)
      //获取缓冲区内的map,也就是城市点击量
      val citymap = buffer.getMap[String, Long](1)
      //把map转换成List并排序,排序规则是根据城市点击量降序,并且只取前两个
      val cityToCount = citymap.toList.sortWith(
        (left, right) => left._2 > right._2
      ).take(2)
      //判断citymap里是否有两个以上的城市
      var hasRest = citymap.size>2
      //rest用于统计前两名城市点击量的总和
      var rest = 0L
      //用StringBuffer拼接效率高
      val s = new StringBuilder
      //对List进行循环遍历
      cityToCount.foreach {
      case (city, count) => {
        //点击率=城市点击量/地区总点击量
        val r = (count*100/total)
        //结果字符串中拼接"城市名:点击率"
        s.append(city + " " + r + "%,")
        //统计前两名的点击率总和
        rest = rest +r
      }
    }
      if(hasRest){
        //如果有三个以上的城市,则合并成"其他:点击率"
        //点击率用100-前两名点击率即可
        s.toString()+"其他:"+(100-rest)+"%"
      }else{
        //如果只有两个城市就直接输出
        s.toString()
      }
    }
  }
}





*结果展示*


在这里插入图片描述





*需求4.广告点击黑名单*


广告点击黑名单分为两个模块,第一个模块负责生成模拟点击数据,第二个模块负责实时统计数据并拉入黑名单。





*前置操作*






*TApplication*


先在前面的框架中加入sparkstreaming

trait TApplication {
  var envData : Any = null;
  //创建链接
  def start(t:String = "jdbc")(op : =>Unit) : Unit={
    if(t=="spark"){
      envData = EnvUtil.getEnv()
    }else if (t=="sparkStreaming"){
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")
      val ssc = new StreamingContext(sparkConf,Seconds(3))
    }
    //业务抽象
    try{
      op
    }catch {
      case ex: Exception=> println(ex.getMessage)
    }
    //关闭链接
    if(t=="spark"){
      EnvUtil.clear()
    }else if(t=="sparkStreaming"){
      val ssc = envData.asInstanceOf[StreamingContext]
      ssc.start()
      ssc.awaitTermination()
    }
  }
}

理论上来讲sparksreaming的time是不能固定的,框架中应该以变量的方式存在,懒了。





*TDao*


在TDao框架中写入kafka的写入和读取函数

trait TDao {
//导入config.properties文件
  val properties = new Properties()
  val config: Properties = PropertiesUtil.load("config.properties")

  def readFile(path:String):RDD[String] = {
    EnvUtil.getEnv().textFile(path)
  }
  def readKafka(): DStream[String] ={
    //TODO 从kafka中读取数据
    //kafka的配置信息
    val kafkaPara = Map[String,Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->config.getProperty("kafka.broker.list"),
      ConsumerConfig.GROUP_ID_CONFIG->"root",
      "key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDStream:InputDStream[ConsumerRecord[String,String]] =
      KafkaUtils.createDirectStream[String,String](
        EnvUtil.getStreamingEnv(),
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String,String](Set("root"),kafkaPara)
    )
    kafkaDStream.map(record => record.value())
  }

  //implicit参数可以动态改变
  def writeToKafka( implicit datas:Seq[String] ): Unit ={
    //TODO 向kafka发送数据
    // 获取配置文件config.properties中的Kafka配置参数
    val broker: String = config.getProperty("kafka.broker.list")
    val topic = "root"
    val prop = new Properties()
    // 添加配置
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    // 根据配置创建Kafka生产者
    // 创建Kafka消费者
    val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](prop)

    while (true) {
      // 随机产生实时数据并通过Kafka生产者发送到Kafka集群中
      for (line <- datas) {
        kafkaProducer.send(new ProducerRecord[String, String](topic, line))
        println(line)

      }
      Thread.sleep(2000)
    }
  }
}





*Config.properties*


负责给jdbc和kafka提供配置信息。

#jdbc配置
jdbc.datasource.size=10
jdbc.url=jdbc:mysql://hadoop01:3306/spark2020?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
jdbc.user=root
jdbc.password=root

# Kafka配置
kafka.broker.list=hadoop01:9092,hadoop02:9092,hadoop03:9092





*数据模拟模块*






*MockDataController*


一个控制器,负责调用Service的计算逻辑

class MockDataController extends TController{
  private val mockDataService = new MockDataService
  override def excute(): Unit = {
    val result = mockDataService.analysis()
  }
}





*MockDataService*


直接调用在Dao中

class MockDataService extends TService{
  private val mockDataDao = new MockDataDao();
  /**
   * 数据分析
   * @return
   */
  override def analysis() = {
    val datas = mockDataDao.mockData()
    import mockDataDao._
    mockDataDao.writeToKafka(datas)
  }





*MockDataDao*


是数据生成是实际逻辑,因为属于数据交互所以放在dao层。

生成时间戳,然后随机的生成城市用户和被点击的广告。

class MockDataDao extends TDao{
  case class RanOpt[T](value: T, weight: Int)
  object RandomOptions {
    def apply[T](opts: RanOpt[T]*): RandomOptions[T] = {
      val randomOptions = new RandomOptions[T]()
      for (opt <- opts) {
        randomOptions.totalWeight += opt.weight
        for (i <- 1 to opt.weight) {
          randomOptions.optsBuffer += opt.value
        }
      }
      randomOptions
    }
  }

  class RandomOptions[T](opts: RanOpt[T]*) {

    var totalWeight = 0
    var optsBuffer = new ListBuffer[T]

    def getRandomOpt: T = {
      val randomNum: Int = new Random().nextInt(totalWeight)
      optsBuffer(randomNum)
    }
  }
  //TODO 数据生成
  implicit def mockData(): Seq[String]={
    val array: ArrayBuffer[String] = ArrayBuffer[String]()
    val CityRandomOpt = RandomOptions(RanOpt(CityInfo(1, "北京", "华北"), 30),
      RanOpt(CityInfo(2, "上海", "华东"), 30),
      RanOpt(CityInfo(3, "广州", "华南"), 10),
      RanOpt(CityInfo(4, "深圳", "华南"), 20),
      RanOpt(CityInfo(5, "天津", "华北"), 10))

    val random = new Random()
    // 模拟实时数据:
    // timestamp province city userid adid
    for (i <- 0 to 50) {

      val timestamp: Long = System.currentTimeMillis()
      val cityInfo: CityInfo = CityRandomOpt.getRandomOpt
      val city: String = cityInfo.city_name
      val area: String = cityInfo.area
      val adid: Int = 1 + random.nextInt(6)
      val userid: Int = 1 + random.nextInt(6)

      // 拼接实时数据
      array += timestamp + " " + area + " " + city + " " + userid + " " + adid
    }
    array.toSeq
  }
}





*MockDataApplication*


生成数据用的实际运行程序

object MockDataApplication extends App with TApplication{
  start("sparkStreaming"){
    val controller = new MockDataController
    controller.excute()
  }
}





*结果展示*


运行时要保证kafka集群已启动

在这里插入图片描述





*逻辑计算模块*


先建两个工具bean类,用于规划读取的数据

package object bean {
  case class Ad_Click_Log(
                         ts:String,
                         area:String,
                         city:String,
                         userid:String,
                         adid:String
                         )
case class CityInfo (city_id:Long,
                     city_name:String,
                     area:String)
}





*MySQL建表*


CREATE DATABASE spark2020;
use spark2020;
CREATE TABLE black_list (userid VARCHAR(2) PRIMARY KEY);
CREATE TABLE user_ad_count (
	dt VARCHAR(255),
	userid VARCHAR (2),
	adid VARCHAR (2),
	count BIGINT,
	PRIMARY KEY (dt, userid, adid)
);





*BlackListController*


class BlackListController extends TController{
  private val blackListService = new BlackListService
  override def excute(): Unit = {
    val result = blackListService.analysis()
  }
}





*BlackListService*


是统计黑名单的主要逻辑类,先获取从kafka中所有的点击数据,然后将数据根据bean类格式化,周期性的获取黑名单的信息,这里的周期性使用transform来实现,sparkstreamng会周期性的将数据读取,读取的数据周期性的进行bean的封装,在这个逻辑之中添加获取黑名单信息(jdbc),如果用户在黑名单之中则停止对他的统计。

当有用户的当日点击量超过100时,将其拉入黑名单。

class BlackListService extends TService{

  private val blackListDao = new BlackListDao
  /**
   * 数据分析
   * @return
   */
  override def analysis(): Any = {
    //TODO 读取所有的点击数据
    val ds = blackListDao.readKafka()
    //TODO 将数据转换成样例类来使用
    val logsDS = ds.map(
      data => {
        val datas = data.split(" ")
        Ad_Click_Log(datas(0),datas(1),datas(2),datas(3),datas(4))
      }
    )
    //TODO 周期性的获取黑名单信息,判断当前用户是否在黑名单中
    val value = logsDS.transform(
      rdd => {
        //TODO 每次封装都可以周期性执行
        //读取mysql数据库获取黑名单信息\
        //获取连接对象
        val connection = JdbcUtil.getConnection
        //发送sql语句
        val pstat = connection.prepareStatement(
          """
            |select userid from black_list
            |""".stripMargin
        )
        //返回一个结果集
        val rs = pstat.executeQuery()
        val blackIds = ListBuffer[String]()
        while (rs.next()) {
          blackIds.append(rs.getString(1))
        }
        rs.close()
        pstat.close()
        connection.close()
        //TODO 如果用户在黑名单中则不再进行统计
        val filterRDD = rdd.filter(
          log => {
            //判断用户是否在黑名单里
            !blackIds.contains(log.userid)
          }
        )
        //TODO 将正常的数据进行点击量统计->wordCount
        /**
         * 时间戳是毫秒级的,作为key会导致基本上进不了黑名单
         * 按需求,应该把时间戳改变成日期,按天统计
         */
        val sdf = new SimpleDateFormat("yyyy-MM-dd")
        val mapRDD = filterRDD.map(
          log => {
            val date = new Date(log.ts.toLong)
            ((sdf.format(date), log.userid, log.adid), 1)
          }
        )
        //直接进行统计
        mapRDD.reduceByKey(_ + _)
      }
    )
    //TODO 将符合黑名单的用户拉入黑名单
    value.foreachRDD(
      rdd => {
            rdd.foreach{
              case ((day,userid,adid),sum)=>{
                //TODO 每个采集周期中的结果
                //这一个采集周期也就是几秒钟,应该不断进行聚合
                //TODO 更新用户的点击量(先插入,如果存在则更新)
                val conn = JdbcUtil.getConnection
                val pstat = conn.prepareStatement(
                  """
                    |insert into user_ad_count (dt,userid,adid,count)
                    |values(?,?,?,?)
                    |on duplicate key
                    |update count = count + ?
                    |""".stripMargin)

                pstat.setString(1,day)
                pstat.setString(2,userid)
                pstat.setString(3,adid)
                pstat.setLong(4,sum)
                pstat.setLong(5,sum)
                pstat.executeUpdate()

                //TODO 获取最新的用户统计数据
                //TODO 判断是否超过阈值
                //TODO 超过就拉入黑名单
                /**
                 * 吧超过阈值的数据添加到黑名单中去,如果表中已经有了这个值,则直接更新
                 */
                val pstat1 = conn.prepareStatement(
                  """
                    |insert into black_list (userid)
                    |select userid from user_ad_count
                    |where dt = ? and userid = ? and adid = ? and count >= 100
                    |on duplicate key
                    |update userid = ?
                    |""".stripMargin)

                pstat1.setString(1,day)
                pstat1.setString(2,userid)
                pstat1.setString(3,adid)
                pstat1.setString(4,userid)
                pstat1.executeUpdate()

                pstat.close()
                pstat1.close()
                conn.close()
              }
            }
      }
     )
    }
}





*BlackListDao*


毫无作用,就是负责集成一下TDao

class BlackListDao extends TDao{
}





*BlackListApplication*


统计黑名单程序的主入口

object BlackListApplication extends App with TApplication{
  start("sparkStreaming"){
    val controller = new BlackListController
    controller.excute()
  }
}





*结果展示*


对当日点击次数超过100的所有用户都进行了黑名单处理,并且黑名单中的用户都不再统计。

在这里插入图片描述





*需求5.广告点击量实时统计*






*MySQL建表*


use spark2020;
CREATE TABLE `area_city_ad_count` (
  `dt` varchar(255) NOT NULL DEFAULT '',
  `area` varchar(41) NOT NULL DEFAULT '',
  `city` varchar(41) NOT NULL DEFAULT '',
  `adid` varchar(21) NOT NULL DEFAULT '',
  `count` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`dt`,`area`,`city`,`adid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;





*DateAreaCityAdCountAnalysisController*


class DateAreaCityAdCountAnalysisController extends TController{
  private val dateAreaCityAdCountAnalysisService = new DateAreaCityAdCountAnalysisService
  override def excute(): Unit = {
    val result = dateAreaCityAdCountAnalysisService.analysis()
  }
}





*DateAreaCityAdCountAnalysisService*


这里面就是简单的对所有点击做一个wordCount统计,统计出来所有的数据以后存入MySQL中,默认以insert方式插入数据,若主键存在,则将点击数求和。

class DateAreaCityAdCountAnalysisService extends TService{
  private val dateAreaCityAdCountAnalysisDao = new DateAreaCityAdCountAnalysisDao
  override def analysis()= {
    //读取kafka数据
    val messageDS = dateAreaCityAdCountAnalysisDao.readKafka()
    //TODO 将数据转换为样例类
    val logDS = messageDS.map(
      data => {
        val datas = data.split(" ")
        Ad_Click_Log(datas(0), datas(1), datas(2), datas(3), datas(4))
      }
    )
    //TODO
    val sdf = new SimpleDateFormat("yyyy-MM-dd")
    val dayDS = logDS.map(
      log => {
        val day = sdf.format(new Date(log.ts.toLong))
        ((day,log.area, log.city, log.adid), 1)
      }
    )
    val resultDS = dayDS.reduceByKey(_ + _)
    resultDS.foreachRDD(
      rdd => {
        rdd.foreachPartition(
          datas => {
            //TODO 获取数据库的链接
            val conn = JdbcUtil.getConnection
            val pstat = conn.prepareStatement(
              """
                |insert into area_city_ad_count (dt,area,city,adid,count)
                |values(?,?,?,?,?)
                |on duplicate key
                |update count = count + ?
                |""".stripMargin)
            //TODO 操作数据库
            datas.foreach {
            case ((dt, area, city, adid), count) => {
              println(dt, area, city, adid,count)
              pstat.setString(1, dt)
              pstat.setString(2, area)
              pstat.setString(3, city)
              pstat.setString(4, adid)
              pstat.setLong(5, count)
              pstat.setLong(6, count)
              pstat.executeUpdate()
            }
          }
            //TODO 关闭资源
            pstat.close()
            conn.close()
          }
        )
      }
    )
  }
}





*DateAreaCityAdCountAnalysisDao*


class DateAreaCityAdCountAnalysisDao extends TDao{
}





*DateAreaCityAdCountAnalysisApplication*


object DateAreaCityAdCountAnalysisApplication extends App with TApplication{
  start("sparkStreaming"){
    val controller = new DateAreaCityAdCountAnalysisController
    controller.excute()
  }
}





*结果展示*


在这里插入图片描述





*需求6.最近一小时点击量*






*LasHourAdCountAnalysis********Controller*


class LasHourAdCountAnalysisController extends TController{
  private val lasHourAdCountAnalysisService = new LasHourAdCountAnalysisService
  override def excute(): Unit = {
    lasHourAdCountAnalysisService.analysis()
  }
}





*LasHourAdCountAnalysis********Service*


class LasHourAdCountAnalysisService extends TService{
  private val lasHourAdCountAnalysisDao = new LasHourAdCountAnalysisDao
  /**
   * 数据分析
   * @return
   */
  override def analysis() = {
    //TODO 读kafka数据
    val massage = lasHourAdCountAnalysisDao.readKafka()
    //TODO 对数据进行格式化
    val logDS = massage.map(
      data => {
        val datas = data.split(" ")
        Ad_Click_Log(datas(0), datas(1), datas(2), datas(3), datas(4))
      }
    )
    //TODO 操作时间戳123456/10000->12*10000->120000
    //格式转换只保留时间戳、广告、点击数
    val toCountDS = logDS.map(
      log => {
        val ts = log.ts.toLong
        ((log.adid, ts / 10000 * 10000), 1)
      }
    )
    //开窗统计,每十秒统计一次,窗口内放最近一分钟的数据
    val sumDS = toCountDS.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(60),Seconds(10))
    //格式转换,以广告为key,顺便把时间戳转换成时间格式
    val sdf = new SimpleDateFormat("yyyy-MM-dd-HH")
    val adToTimeSumDS = sumDS.map {
      case ((adid, time), sum) => {
        (adid, (sdf.format(time)+":00", sum))
      }
    }
    //根据key聚合
    val value = adToTimeSumDS.groupByKey()
    val resultDS = value.mapValues(
      iter => {
        iter.toList.sortWith(
          (left, right) => {
            left._1 < right._1
          }
        )
      }
    )
    resultDS.print()
  }
}





*LasHourAdCountAnalysis********Dao*


class LasHourAdCountAnalysisDao extends TDao{
}





*LasHourAdCountAnalysis********Application*


object LasHourAdCountAnalysisApplication extends App with TApplication{
  start("sparkStreaming"){
    val controller = new LasHourAdCountAnalysisController
    controller.excute()
  }
}





*结果展示*


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-addLKwSu-1597995575392)(file:///C:\Users\1234\AppData\Local\Temp\ksohtml12372\wps65.jpg)]

至此项目已经完成,项目源码

下载



版权声明:本文为weixin_46466052原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。