*电商项目实战*
项目使用scala编写,项目中使用的数据
下载链接
数据内容为电商的用户点击数据,以”-”分隔,部分数据示例:
2019-07-17_95_26070e87-1ad7-49a3-8fb3-cc741facaddf_37_2019-07-17 00:00:02_手机_-1_-1_null_null_null_null_3
2019-07-17_95_26070e87-1ad7-49a3-8fb3-cc741facaddf_48_2019-07-17 00:00:10_null_16_98_null_null_null_null_19
2019-07-17_95_26070e87-1ad7-49a3-8fb3-cc741facaddf_6_2019-07-17 00:00:17_null_19_85_null_null_null_null_7
2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_29_2019-07-17 00:00:19_null_12_36_null_null_null_null_5
2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_22_2019-07-17 00:00:28_null_-1_-1_null_null_15,1,20,6,4_15,88,75_9
2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_11_2019-07-17 00:00:29_苹果_-1_-1_null_null_null_null_7
2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_24_2019-07-17 00:00:38_null_-1_-1_15,13,5,11,8_99,2_null_null_10
2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_24_2019-07-17 00:00:48_null_19_44_null_null_null_null_4
2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_47_2019-07-17 00:00:54_null_14_79_null_null_null_null_2
2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_27_2019-07-17 00:00:59_null_3_50_null_null_null_null_26
2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_27_2019-07-17 00:01:05_i7_-1_-1_null_null_null_null_17
2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_24_2019-07-17 00:01:07_null_5_39_null_null_null_null_10
2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_25_2019-07-17 00:01:13_i7_-1_-1_null_null_null_null_24
2019-07-17_38_6502cdc9-cf95-4b08-8854-f03a25baa917_22_2019-07-17 00:01:21_null_19_62_null_null_null_null_20
*数据格式(日期_用户ID_SessionID_页面ID_时间戳_搜索关键字_点击品类ID_点击产品ID_订单品类iD_订单产品ID_支付品类ID_支付产品ID_城市ID)*
在写需求之前,先稍微写一个框架,把重复操作的内容抽象出来。
先导入相关的依赖jar包,别管用不用得到,之后肯定用得到,注意不要重了,之前wordCount导入了一部分。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>2.4.6</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>2.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
项目采用三层架构(Dao-Service-Controller)
Scala下创建一个package,用于存放抽象框架,暂时命名为core,包下创建四个类,分别为TApplication,TController,TService,TDao.
其中Application就是程序的主入口,Controller为控制器,负责调用计算逻辑。Service中编写实际的计算逻辑,Dao用于数据交互,demo类也莫得数据库Dao基本上是闲置的。
TApplication->将重复的打开、关闭spark链接的操作抽象出来,通过start函数实现,把实际业务抽象为op。
trait TApplication {
var envData : Any = null;
//创建链接
def start(t:String = "jdbc")(op : =>Unit) : Unit={
if(t=="spark"){
envData = EnvUtil.getEnv()
}
//业务抽象
try{
op
}catch {
case ex: Exception=> println(ex.getMessage)
}
//关闭链接
if(t=="spark"){
EnvUtil.clear()
}
}
}
TController->里面只有一个excute抽象函数,Controller继承后重写这个函数。
trait TController { def excute():Unit }
TService->只有一个analtsis函数,重写这个函数编写实际的数据处理逻辑。
trait TService {
/**
* 数据分析
* @return
*/
def analysis():Any
}
TDao->readFile是每个需求都会用到的方法,放在抽象类里,如果还有其他需求则继承TDao之后再进行编写。
trait TDao {
def readFile(path:String):RDD[String] = {
EnvUtil.getEnv().textFile(path)
}
}
*需求1.统计最受欢迎的品类,先排序点击-再订单-最后支付*
热门品类前十:HotCateGory
*HotCateGoryController*
->没什么好说的,就是个计算逻辑入口,控制器而已。
private val hotCateGoryService = new HotCateGoryService
override def excute(): Unit = {
val result = hotCateGoryService.analysis()
result.foreach(println)
}
}
*HotCateGoryService*
->逻辑功能,思路就是分别对点击、订单、支付三种数据进行统计求和,以品类ID为key,以点击/订单/支付为value生成RDD,最终将三个数据合并扁平化处理。排序取前十位,就是个稍微复杂点的wordcount。
/**
* 数据分析
* 数据格式(日期_用户ID_SessionID_页面ID_时间戳_搜索关键字_点击品类ID_点击产品ID_订单品类iD_订单产品ID_支付品类ID_支付产品ID_城市ID)
*/
private val hotCateGoryDao = new HotCateGoryDao()
override def analysis(): Array[(String,(Int,Int,Int))] = {
//TODO 读取电商日志
val actionRDD: RDD[String] = hotCateGoryDao.readFile("input/user_visit_action.txt")
//数据缓存,防止重复读取文件资源浪费
actionRDD.cache()
//TODO 对点击统计
val clickRDD = actionRDD.map(action => {
val datas = action.split("_")
//数据组的第6个数据是指品类
(datas(6),1)
}).filter(_._1 != "-1")//过滤掉-1无效点击
val cateGoryIdToClickCountRDD: RDD[(String,Int)] = clickRDD.reduceByKey(_+_)
//TODO 对下单统计
/**
* 订单中可以有多个商品(品类,订单次数)就会变成 ↓
* (品类1,品类2,品类3,1)
*/
val orderRDD = actionRDD.map(action => {
val datas = action.split("_")
//数据组的第8个数据是指订单
(datas(8))
}).filter(_ != "null")//过滤掉null无效订单
//扁平化(品类1,品类2,品类3,1)->(品类1,1)(品类2,1)(品类3,1)
val orderToOneRDD = orderRDD.flatMap{
id => {
val ids = id.split(",")
ids.map(id => (id,1))
}
}
val cateGoryIdToOrderCountRDD: RDD[(String,Int)] = orderToOneRDD.reduceByKey(_ + _)
//TODO 对付款统计
val payRDD = actionRDD.map(action => {
val datas = action.split("_")
//数据组的第10个数据是指付款
(datas(10))
}).filter(_ != "null")//过滤掉null无效支付
val payToOneRDD = payRDD.flatMap{
id => {
val ids = id.split(",")
ids.map(id => (id,1))
}
}
val cateGoryIdToPayCountRDD: RDD[(String,Int)] = payToOneRDD.reduceByKey(_ + _)
//TODO 把上面的三种RDD合并(品类,点击数,订单数,支付数)
/**
* join频繁使用笛卡尔积效率极低
* 换用reduce,reduce函数必须保证合并的类型相同
* 既两个(String,Int)不能合并成(String,(Int,Int,Int))
* 先将类型转换
* (品类,点击次数)->(品类,点击次数,0,0)
* (品类,订单次数)->(品类,0,订单次数,0)
* (品类,支付次数)->(品类,0,0,支付次数)
* 再直接合并
*/
val newClickRDD = cateGoryIdToClickCountRDD.map{
case (id,click)=>{
(id,(click,0,0))
}
}
val newOrderRDD = cateGoryIdToOrderCountRDD.map{
case (id,order)=>{
(id,(0,order,0))
}
}
val newPayRDD = cateGoryIdToPayCountRDD.map{
case (id,pay)=>{
(id,(0,0,pay))
}
}
//先简单的加到一起
val countRDD = newClickRDD.union(newOrderRDD).union(newPayRDD)
val reduceRDD = countRDD.reduceByKey(
(t1,t2) =>{
(t1._1+t2._1,t1._2+t2._2,t1._3+t2._3)
}
)
//TODO 对结果排序
val sortRDD: RDD[(String,(Int,Int,Int))] = reduceRDD.sortBy(_._2, false)
//TODO 取前十
val result = sortRDD.take(10)
result
}
*HotCateGoryDao*
->空函数,就是为了继承一下TDao,通过路径生成spark链接。
class HotCateGoryDao extends TDao{
}
*HotCateGoryApplication*
->程序启动入口,调用Controller。
object HotCateGoryApplication extends App with TApplication{
//TODO 获取热门品类前十,就是个稍微复杂点的wordcount
start("spark"){
val controller = new HotCateGoryController
controller.excute()
}
}
*结果展示*
执行结果,处于时间考虑,yarn集群上运行龟速计算,直接开发环境省事了。
*需求2.统计页面跳转率*
例:首页-详情跳转数/首页总点击数
页面跳转率:PageFlow
*PageFlowController*
->也就是个控制器而已。
class PageFlowController extends TController{
private val pageFlowService = new PageFlowService
override def excute(): Unit = {
val result = pageFlowService.analysis()
result
}
}
*PageFlowService*
->跳转率计算的核心逻辑,把分子分母分开计算,先统计每个页面的总跳转数,没啥难度就是个wordcount。然后按页面跳转方向统计跳转次数,首先根据sessionID统计每个用户的总点击流,根据时间戳排序,然后将页面的点击流转化为页面跳转。
例:(1,2,3,4)->((1,2),1) ((2,3),1) ((3,4),1)
转化的方法是先将点击流头部去掉,然后两个map错位相连。
1 | 2 | 3 | 4 |
---|---|---|---|
2 | 3 | 4 |
然后再去除session,一个没用的数据罢了。
最后再以(来源页面-跳转页面)为key,点击数为value做统计,得到分子。
最后使用分子/分母就是页面跳转率,其中分母需要将分子的key进行split(“-”)切分,只获取对应的来源页面的点击数。最后做一个百分数处理,保留两位小数。
class PageFlowService extends TService{
/**
* 数据分析
* 页面跳转率=页面跳转方向/页面总点击率
* @return
*/
private val pageFlowDao = new PageFlowDao
override def analysis(): Any = {
val actionRDD:RDD[UserVisitAction] = pageFlowDao.getUserVisitAction("input/user_visit_action.txt")
//TODO 先算分母页面总点击率,就是个简单的wordcount
val pageToOneRDD =actionRDD.map(
action => {
(action.page_id,1)
}
)
val sumRDD = pageToOneRDD.reduceByKey(_+_)
val pageCountArray = sumRDD.collect()
//TODO 分子计算
/**
* 页面跳转方向计算比较复杂,要计算跳转方向
* 比如3->5和4->5就是不同的页面跳转方向,还要计算时间,不同的时间跳转不算同一个操作
* 不同用户跳转不算同一操作
*/
val sessionRDD = actionRDD.groupBy(_.session_id)
//pageFlowRDD类型:(session,List("来源页面-当前页面",跳转次数))
val pageFlowRDD:RDD[(String,List[(String,Int)])] = sessionRDD.mapValues(
iter => {
//自定义排序,按时间戳排序
val actions = iter.toList.sortWith(
(left, right) => {
left.action_time < right.action_time
}
)
//将排序后的数据进行结构的转换,只需要pageid
val pageIds = actions.map(_.page_id)
/**
* 格式转换(页面1,页面2,页面3)->((来源页面,当前页面),跳转次数)
* (1,2,3,4)->((1,2),1)((2,3),1)((3,4),1)
* 使用zip命令错位相连
* 1,2,3,4.tail=2,3,4
* 1,2,3,4(2,3,4)->(1,2)(2-3)(3-4)
*/
val zipIds = pageIds.zip(pageIds.tail)
zipIds.map {
case (pageid1, pageid2) => {
(pageid1 + "-" + pageid2, 1)
}
}
}
)
//TODO 将分组后的数据进行结构转换,不需要session
//把RDD[List(String,Int)]转换成RDD[(String,Int)]
val value = pageFlowRDD.map(_._2).flatMap(list=>list)
//这里统计到了(“2-3”,1),只需要合并统计即可得到页面跳转转换率
val pageSumRDD = value.reduceByKey(_ + _)
//TODO 计算转换率
/**
* 分子除分母即可,要做一下对应
* (1-2的跳转数)/2的页面点击数
* 页面点击数的格式List(页面ID,点击数),转换成map,根据页面号取value
*/
pageSumRDD.foreach{
//sum就是跳转数
case (pageflow,sum) =>{
val pageid = pageflow.split("-")(0)
//获取对应页面总点击数
val pageValue = pageCountArray.toMap.getOrElse(pageid.toLong,1)
println("页面跳转【"+pageflow+"】转换率为"+(sum.toDouble/pageValue*100).formatted("%.2f")+"%")
}
}
}
}
*PageFlowDao*
->值得一提的是这次Dao层终于有作用了,用于将看不懂的数据封装成类。
class PageFlowDao extends TDao{
def getUserVisitAction(path:String) = {
val rdd: RDD[String] = readFile(path)
rdd.map(
line=>{
val datas = line.split("_")
UserVisitAction(
datas(0),
datas(1).toLong,
datas(2),
datas(3).toLong,
datas(4),
datas(5),
datas(6).toLong,
datas(7).toLong,
datas(8),
datas(9),
datas(10),
datas(11),
datas(12).toLong
)
}
)
}
}
*UserVisitAction*
->就是Dao使用到的bean类。
package object bean {
case class UserVisitAction(
date: String,//用户点击行为的日期
user_id: Long,//用户的ID
session_id: String,//Session的ID
page_id: Long,//某个页面的ID
action_time: String,//动作的时间点
search_keyword: String,//用户搜索的关键词
click_category_id: Long,//某一个商品品类的ID
click_product_id: Long,//某一个商品的ID
order_category_ids: String,//一次订单中所有品类的ID集合
order_product_ids: String,//一次订单中所有商品的ID集合
pay_category_ids: String,//一次支付中所有品类的ID集合
pay_product_ids: String,//一次支付中所有商品的ID集合
city_id: Long//城市 id
)
}
*PageFlowApplication*
->最后在PageFlowApplication里调用Controller即可
object PageFlowApplication extends App with TApplication{
start("spark"){
val controller = new PageFlowController
controller.excute()
}
}
*结果展示*
看一下结果吧,太多了我就截一部分,大多数跳转率都是在2%左右
*需求3.不同区域内的热门商品Top3*
*数据准备*
->在hive中创建表,将已经准备好的数据导入hive
CREATE DATABASE spark_sql;
CREATE TABLE `user_visit_action`(
`date` string,
`user_id` bigint,
`session_id` string,
`page_id` bigint,
`action_time` string,
`search_keyword` string,
`click_category_id` bigint,
`click_product_id` bigint,
`order_category_ids` string,
`order_product_ids` string,
`pay_category_ids` string,
`pay_product_ids` string,
`city_id` bigint)
row format delimited fields terminated by '\t';
load data local inpath '/root/data/user_visit_action.txt' into table user_visit_action;
CREATE TABLE `product_info`(
`product_id` bigint,
`product_name` string,
`extend_info` string)
row format delimited fields terminated by '\t';
load data local inpath '/root/data/product_info.txt' into table product_info;
CREATE TABLE `city_info`(
`city_id` bigint,
`city_name` string,
`area` string)
row format delimited fields terminated by '\t';
load data local inpath '/root/data/city_info.txt' into table city_info;
*SaprkSQL链接hive*
->首先把hive-site.xml从虚拟机里搞出来放在resources目录下
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://192.168.15.101:3306/hive?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
</property>
</configuration>
写个链接demo测试下能不能链接到hive
object SparkSQLHive {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","root")
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQLLS")
val spark = new sql.SparkSession.Builder()
.enableHiveSupport()
.config(sparkConf)
.getOrCreate()
spark.sql("show databases").show
spark.stop
}
}
#大坑
因为要使用idea对hive进行远程操作,所以必须启动hive
的两个远程访问进程。不然会一直报错
Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStor
hive –service metastore &端口号默认是9083
hive –service hiveserver &端口号默认是10000
链接成功,红字是SLF4J的日志,不是报错。
尝试使用scala代码对hive进行建表/导入操作。先在hive中把之前创建的表删掉。数据库就保留着吧,创库有手就行就不演示了。
drop table city_info;
drop table user_visit_action;
drop table product_info;
*SparkSQL进行hive建表导入数据*
->然后使用sparksql对hive进行操作,其中元数据放在了项目根目录下的/input文件夹中。
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","root")
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQLLS")
val spark = new sql.SparkSession.Builder()
.enableHiveSupport()
.config(sparkConf)
.getOrCreate()
import spark.implicits._
spark.sql("show databases").show
//TODO 根据当前/input下的数据创建hive表
spark.sql("use spark_sql")
/创建user_visit_action表
spark.sql(
"""
|CREATE TABLE `user_visit_action`(
| `date` string,
| `user_id` bigint,
| `session_id` string,
| `page_id` bigint,
| `action_time` string,
| `search_keyword` string,
| `click_category_id` bigint,
| `click_product_id` bigint,
| `order_category_ids` string,
| `order_product_ids` string,
| `pay_category_ids` string,
| `pay_product_ids` string,
| `city_id` bigint)
|row format delimited fields terminated by '\t'
|""".stripMargin)
//为user_visit_action导入数据,注意要指定库名
spark.sql(
"""
|load data local inpath 'input/user_visit_action.txt' into table spark_sql.user_visit_action
|""".stripMargin)
//创建product_info表
spark.sql(
"""
|CREATE TABLE `product_info`(
| `product_id` bigint,
| `product_name` string,
| `extend_info` string)
|row format delimited fields terminated by '\t'
|""".stripMargin)
//为product_info导入数据
spark.sql(
"""
|load data local inpath 'input/product_info.txt' into table spark_sql.product_info
|""".stripMargin)
//创建city_info表
spark.sql(
"""
|CREATE TABLE `city_info`(
| `city_id` bigint,
| `city_name` string,
| `area` string)
|row format delimited fields terminated by '\t'
|""".stripMargin)
//为city_info导入数据
spark.sql(
"""
|load data local inpath 'input/city_info.txt' into table spark_sql.city_info
|""".stripMargin)
//随便查询一个表测试是否成功,数据过多容易卡只显示前10条
spark.sql(
"""
|select * from city_info
|""".stripMargin).show(10)
spark.stop
}
查看结果,创建成功。
*SparkSQL计算逻辑*
->需要的结果为“地区,商品名,点击量,(城市1:点击率,城市2:点击率,其他:点击率)”
首先需要将三个数据表根据相应的主键外键链接起来,直接join即可生成临时表t1。
然后根据地区和商品名进行分组,统计区域内该商品的总点击量,统计区域内每个城市贡献的点击率,生成临时表t2。
最后对t2表进行开窗查询,对每个地区 的商品分别进行排序,取前三名,并统计排名。
展示结果数据。
object SparkSQLReq {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","root")
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQLLS")
val spark = new sql.SparkSession.Builder()
.enableHiveSupport()
.config(sparkConf)
.getOrCreate()
import spark.implicits._
//TODO 写sql完成统计,复杂的聚合函数需要自定义
spark.sql("use spark_sql")
/**
* t1表就是简单的把三个表根据城市id和商品id,join起来
* t2是在t1的基础上根据地区和商品进行分组,然后统计总点击量
* t3使用开窗函数over把每个地区的点击量desc排序并且输出一个rank排名值
* 最后select这个t3的结果进行展示,只取前三名
*/
//TODO 先把三个表根据各自的主键外键拼接起来,去除掉click=-1的数据(无效点击)
spark.sql(
"""
|select a.*,c.*,p.product_name
|from user_visit_action a
|join city_info c on c.city_id = a.city_id
|join product_info p on p.product_id = a.click_product_id
|where a.click_product_id > -1
|""".stripMargin).createOrReplaceTempView("t1")
//TODO 把自定义的聚合函数注册一下,根据地区和商品名进行分组,统计每个商品的地区总点击量和地区内城市点击率
//因为这里是要用sql进行聚合,所以必须使用弱类型自定义聚合函数
//强类型聚合函数是使用的DataSet进行聚合,而不是sql
val udaf = new CityRemarkUDAF
spark.udf.register("cityRemark",udaf)
spark.sql(
"""
|select area,product_name,count(*) as click_count,cityRemark(city_name)
|from t1
|group by area,product_name
|""".stripMargin).createOrReplaceTempView("t2")
//TODO 对生成的t2表进行开窗查询,统计每个地区内点击量前三的商品
spark.sql(
"""
|select *,rank() over(partition by area order by click_count desc) as rank
|from t2
|""".stripMargin).createOrReplaceTempView("t3")
//TODO 查询结果展示
spark.sql("select * from t3 where rank <= 3").show
spark.stop()
}
class CityRemarkUDAF extends UserDefinedAggregateFunction{
/**
* 输入城市名,根据城市名统计城市的点击量,除以总点击量获得点击率
* 最后返回一个字符串"城市1:点击率,城市2:点击率,其他:点击率"
*/
//输入类型,城市名String
override def inputSchema: StructType = {
StructType(Array(StructField("cityName",StringType)))
}
//缓冲区数据类型,总点击量和城市点击量,城市有多个所以用map比较方便
//例如地区1内包含城市1,城市2,则生成的map为
//Map(城市1,点击量)
//Map(城市2,点击量)
override def bufferSchema: StructType = {
StructType(Array(
StructField("total",LongType),
StructField("cityMap",MapType(StringType,LongType))
))
}
//返回的数据类型,就返回一个String类型的备注"城市:点击率"
override def dataType: DataType = StringType
override def deterministic: Boolean = true
//缓冲区的初始化,根据前面设计的缓冲区类型来
override def initialize(buffer: MutableAggregationBuffer): Unit = {
//初始总点击量为零
buffer(0) = 0L
//初始map为空
buffer(1) = Map[String ,Long]()
}
//更新缓冲区内容
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
//输入的内容就是city_name,直接getString获取
val cityName = input.getString(0)
//每进来一个数据就让总点击量+1
buffer(0) = buffer.getLong(0) + 1
//获取缓冲区内的Map
val citymap = buffer.getAs[Map[String,Long]](1)
//获取Map内城市,点击量的值,如果没有城市则生成一个默认值为0的map,点击量+1
val newClick = citymap.getOrElse(cityName,0L)+1
//更新Map的值为新点击量
buffer(1) = citymap.updated(cityName,newClick)
}
//合并缓冲区的内容
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
//第一个就是总点击量没什么好说的,直接加就完事
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
//获取两个缓冲区的Map
val map1 = buffer1.getAs[Map[String,Long]](1)
val map2 = buffer2.getAs[Map[String,Long]](1)
//合并Map的逻辑
buffer1(1) = map1.foldLeft(map2){
//输入值为map1,map2.key,map2.value
case (map,(k,v)) =>{
//更新map1,更新的值为map1的值(默认为0)加上map2.value
map.updated(k,map.getOrElse(k,0L) + v)
}
}
}
//聚合函数返回值的计算逻辑
override def evaluate(buffer: Row): Any = {
//先获取这个商品/地区的总点击量
val total = buffer.getLong(0)
//获取缓冲区内的map,也就是城市点击量
val citymap = buffer.getMap[String, Long](1)
//把map转换成List并排序,排序规则是根据城市点击量降序,并且只取前两个
val cityToCount = citymap.toList.sortWith(
(left, right) => left._2 > right._2
).take(2)
//判断citymap里是否有两个以上的城市
var hasRest = citymap.size>2
//rest用于统计前两名城市点击量的总和
var rest = 0L
//用StringBuffer拼接效率高
val s = new StringBuilder
//对List进行循环遍历
cityToCount.foreach {
case (city, count) => {
//点击率=城市点击量/地区总点击量
val r = (count*100/total)
//结果字符串中拼接"城市名:点击率"
s.append(city + " " + r + "%,")
//统计前两名的点击率总和
rest = rest +r
}
}
if(hasRest){
//如果有三个以上的城市,则合并成"其他:点击率"
//点击率用100-前两名点击率即可
s.toString()+"其他:"+(100-rest)+"%"
}else{
//如果只有两个城市就直接输出
s.toString()
}
}
}
}
*结果展示*
*需求4.广告点击黑名单*
广告点击黑名单分为两个模块,第一个模块负责生成模拟点击数据,第二个模块负责实时统计数据并拉入黑名单。
*前置操作*
*TApplication*
先在前面的框架中加入sparkstreaming
trait TApplication {
var envData : Any = null;
//创建链接
def start(t:String = "jdbc")(op : =>Unit) : Unit={
if(t=="spark"){
envData = EnvUtil.getEnv()
}else if (t=="sparkStreaming"){
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")
val ssc = new StreamingContext(sparkConf,Seconds(3))
}
//业务抽象
try{
op
}catch {
case ex: Exception=> println(ex.getMessage)
}
//关闭链接
if(t=="spark"){
EnvUtil.clear()
}else if(t=="sparkStreaming"){
val ssc = envData.asInstanceOf[StreamingContext]
ssc.start()
ssc.awaitTermination()
}
}
}
理论上来讲sparksreaming的time是不能固定的,框架中应该以变量的方式存在,懒了。
*TDao*
在TDao框架中写入kafka的写入和读取函数
trait TDao {
//导入config.properties文件
val properties = new Properties()
val config: Properties = PropertiesUtil.load("config.properties")
def readFile(path:String):RDD[String] = {
EnvUtil.getEnv().textFile(path)
}
def readKafka(): DStream[String] ={
//TODO 从kafka中读取数据
//kafka的配置信息
val kafkaPara = Map[String,Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->config.getProperty("kafka.broker.list"),
ConsumerConfig.GROUP_ID_CONFIG->"root",
"key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer"
)
val kafkaDStream:InputDStream[ConsumerRecord[String,String]] =
KafkaUtils.createDirectStream[String,String](
EnvUtil.getStreamingEnv(),
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](Set("root"),kafkaPara)
)
kafkaDStream.map(record => record.value())
}
//implicit参数可以动态改变
def writeToKafka( implicit datas:Seq[String] ): Unit ={
//TODO 向kafka发送数据
// 获取配置文件config.properties中的Kafka配置参数
val broker: String = config.getProperty("kafka.broker.list")
val topic = "root"
val prop = new Properties()
// 添加配置
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// 根据配置创建Kafka生产者
// 创建Kafka消费者
val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](prop)
while (true) {
// 随机产生实时数据并通过Kafka生产者发送到Kafka集群中
for (line <- datas) {
kafkaProducer.send(new ProducerRecord[String, String](topic, line))
println(line)
}
Thread.sleep(2000)
}
}
}
*Config.properties*
负责给jdbc和kafka提供配置信息。
#jdbc配置
jdbc.datasource.size=10
jdbc.url=jdbc:mysql://hadoop01:3306/spark2020?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
jdbc.user=root
jdbc.password=root
# Kafka配置
kafka.broker.list=hadoop01:9092,hadoop02:9092,hadoop03:9092
*数据模拟模块*
*MockDataController*
一个控制器,负责调用Service的计算逻辑
class MockDataController extends TController{
private val mockDataService = new MockDataService
override def excute(): Unit = {
val result = mockDataService.analysis()
}
}
*MockDataService*
直接调用在Dao中
class MockDataService extends TService{
private val mockDataDao = new MockDataDao();
/**
* 数据分析
* @return
*/
override def analysis() = {
val datas = mockDataDao.mockData()
import mockDataDao._
mockDataDao.writeToKafka(datas)
}
*MockDataDao*
是数据生成是实际逻辑,因为属于数据交互所以放在dao层。
生成时间戳,然后随机的生成城市用户和被点击的广告。
class MockDataDao extends TDao{
case class RanOpt[T](value: T, weight: Int)
object RandomOptions {
def apply[T](opts: RanOpt[T]*): RandomOptions[T] = {
val randomOptions = new RandomOptions[T]()
for (opt <- opts) {
randomOptions.totalWeight += opt.weight
for (i <- 1 to opt.weight) {
randomOptions.optsBuffer += opt.value
}
}
randomOptions
}
}
class RandomOptions[T](opts: RanOpt[T]*) {
var totalWeight = 0
var optsBuffer = new ListBuffer[T]
def getRandomOpt: T = {
val randomNum: Int = new Random().nextInt(totalWeight)
optsBuffer(randomNum)
}
}
//TODO 数据生成
implicit def mockData(): Seq[String]={
val array: ArrayBuffer[String] = ArrayBuffer[String]()
val CityRandomOpt = RandomOptions(RanOpt(CityInfo(1, "北京", "华北"), 30),
RanOpt(CityInfo(2, "上海", "华东"), 30),
RanOpt(CityInfo(3, "广州", "华南"), 10),
RanOpt(CityInfo(4, "深圳", "华南"), 20),
RanOpt(CityInfo(5, "天津", "华北"), 10))
val random = new Random()
// 模拟实时数据:
// timestamp province city userid adid
for (i <- 0 to 50) {
val timestamp: Long = System.currentTimeMillis()
val cityInfo: CityInfo = CityRandomOpt.getRandomOpt
val city: String = cityInfo.city_name
val area: String = cityInfo.area
val adid: Int = 1 + random.nextInt(6)
val userid: Int = 1 + random.nextInt(6)
// 拼接实时数据
array += timestamp + " " + area + " " + city + " " + userid + " " + adid
}
array.toSeq
}
}
*MockDataApplication*
生成数据用的实际运行程序
object MockDataApplication extends App with TApplication{
start("sparkStreaming"){
val controller = new MockDataController
controller.excute()
}
}
*结果展示*
运行时要保证kafka集群已启动
*逻辑计算模块*
先建两个工具bean类,用于规划读取的数据
package object bean {
case class Ad_Click_Log(
ts:String,
area:String,
city:String,
userid:String,
adid:String
)
case class CityInfo (city_id:Long,
city_name:String,
area:String)
}
*MySQL建表*
CREATE DATABASE spark2020;
use spark2020;
CREATE TABLE black_list (userid VARCHAR(2) PRIMARY KEY);
CREATE TABLE user_ad_count (
dt VARCHAR(255),
userid VARCHAR (2),
adid VARCHAR (2),
count BIGINT,
PRIMARY KEY (dt, userid, adid)
);
*BlackListController*
class BlackListController extends TController{
private val blackListService = new BlackListService
override def excute(): Unit = {
val result = blackListService.analysis()
}
}
*BlackListService*
是统计黑名单的主要逻辑类,先获取从kafka中所有的点击数据,然后将数据根据bean类格式化,周期性的获取黑名单的信息,这里的周期性使用transform来实现,sparkstreamng会周期性的将数据读取,读取的数据周期性的进行bean的封装,在这个逻辑之中添加获取黑名单信息(jdbc),如果用户在黑名单之中则停止对他的统计。
当有用户的当日点击量超过100时,将其拉入黑名单。
class BlackListService extends TService{
private val blackListDao = new BlackListDao
/**
* 数据分析
* @return
*/
override def analysis(): Any = {
//TODO 读取所有的点击数据
val ds = blackListDao.readKafka()
//TODO 将数据转换成样例类来使用
val logsDS = ds.map(
data => {
val datas = data.split(" ")
Ad_Click_Log(datas(0),datas(1),datas(2),datas(3),datas(4))
}
)
//TODO 周期性的获取黑名单信息,判断当前用户是否在黑名单中
val value = logsDS.transform(
rdd => {
//TODO 每次封装都可以周期性执行
//读取mysql数据库获取黑名单信息\
//获取连接对象
val connection = JdbcUtil.getConnection
//发送sql语句
val pstat = connection.prepareStatement(
"""
|select userid from black_list
|""".stripMargin
)
//返回一个结果集
val rs = pstat.executeQuery()
val blackIds = ListBuffer[String]()
while (rs.next()) {
blackIds.append(rs.getString(1))
}
rs.close()
pstat.close()
connection.close()
//TODO 如果用户在黑名单中则不再进行统计
val filterRDD = rdd.filter(
log => {
//判断用户是否在黑名单里
!blackIds.contains(log.userid)
}
)
//TODO 将正常的数据进行点击量统计->wordCount
/**
* 时间戳是毫秒级的,作为key会导致基本上进不了黑名单
* 按需求,应该把时间戳改变成日期,按天统计
*/
val sdf = new SimpleDateFormat("yyyy-MM-dd")
val mapRDD = filterRDD.map(
log => {
val date = new Date(log.ts.toLong)
((sdf.format(date), log.userid, log.adid), 1)
}
)
//直接进行统计
mapRDD.reduceByKey(_ + _)
}
)
//TODO 将符合黑名单的用户拉入黑名单
value.foreachRDD(
rdd => {
rdd.foreach{
case ((day,userid,adid),sum)=>{
//TODO 每个采集周期中的结果
//这一个采集周期也就是几秒钟,应该不断进行聚合
//TODO 更新用户的点击量(先插入,如果存在则更新)
val conn = JdbcUtil.getConnection
val pstat = conn.prepareStatement(
"""
|insert into user_ad_count (dt,userid,adid,count)
|values(?,?,?,?)
|on duplicate key
|update count = count + ?
|""".stripMargin)
pstat.setString(1,day)
pstat.setString(2,userid)
pstat.setString(3,adid)
pstat.setLong(4,sum)
pstat.setLong(5,sum)
pstat.executeUpdate()
//TODO 获取最新的用户统计数据
//TODO 判断是否超过阈值
//TODO 超过就拉入黑名单
/**
* 吧超过阈值的数据添加到黑名单中去,如果表中已经有了这个值,则直接更新
*/
val pstat1 = conn.prepareStatement(
"""
|insert into black_list (userid)
|select userid from user_ad_count
|where dt = ? and userid = ? and adid = ? and count >= 100
|on duplicate key
|update userid = ?
|""".stripMargin)
pstat1.setString(1,day)
pstat1.setString(2,userid)
pstat1.setString(3,adid)
pstat1.setString(4,userid)
pstat1.executeUpdate()
pstat.close()
pstat1.close()
conn.close()
}
}
}
)
}
}
*BlackListDao*
毫无作用,就是负责集成一下TDao
class BlackListDao extends TDao{
}
*BlackListApplication*
统计黑名单程序的主入口
object BlackListApplication extends App with TApplication{
start("sparkStreaming"){
val controller = new BlackListController
controller.excute()
}
}
*结果展示*
对当日点击次数超过100的所有用户都进行了黑名单处理,并且黑名单中的用户都不再统计。
*需求5.广告点击量实时统计*
*MySQL建表*
use spark2020;
CREATE TABLE `area_city_ad_count` (
`dt` varchar(255) NOT NULL DEFAULT '',
`area` varchar(41) NOT NULL DEFAULT '',
`city` varchar(41) NOT NULL DEFAULT '',
`adid` varchar(21) NOT NULL DEFAULT '',
`count` bigint(20) DEFAULT NULL,
PRIMARY KEY (`dt`,`area`,`city`,`adid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
*DateAreaCityAdCountAnalysisController*
class DateAreaCityAdCountAnalysisController extends TController{
private val dateAreaCityAdCountAnalysisService = new DateAreaCityAdCountAnalysisService
override def excute(): Unit = {
val result = dateAreaCityAdCountAnalysisService.analysis()
}
}
*DateAreaCityAdCountAnalysisService*
这里面就是简单的对所有点击做一个wordCount统计,统计出来所有的数据以后存入MySQL中,默认以insert方式插入数据,若主键存在,则将点击数求和。
class DateAreaCityAdCountAnalysisService extends TService{
private val dateAreaCityAdCountAnalysisDao = new DateAreaCityAdCountAnalysisDao
override def analysis()= {
//读取kafka数据
val messageDS = dateAreaCityAdCountAnalysisDao.readKafka()
//TODO 将数据转换为样例类
val logDS = messageDS.map(
data => {
val datas = data.split(" ")
Ad_Click_Log(datas(0), datas(1), datas(2), datas(3), datas(4))
}
)
//TODO
val sdf = new SimpleDateFormat("yyyy-MM-dd")
val dayDS = logDS.map(
log => {
val day = sdf.format(new Date(log.ts.toLong))
((day,log.area, log.city, log.adid), 1)
}
)
val resultDS = dayDS.reduceByKey(_ + _)
resultDS.foreachRDD(
rdd => {
rdd.foreachPartition(
datas => {
//TODO 获取数据库的链接
val conn = JdbcUtil.getConnection
val pstat = conn.prepareStatement(
"""
|insert into area_city_ad_count (dt,area,city,adid,count)
|values(?,?,?,?,?)
|on duplicate key
|update count = count + ?
|""".stripMargin)
//TODO 操作数据库
datas.foreach {
case ((dt, area, city, adid), count) => {
println(dt, area, city, adid,count)
pstat.setString(1, dt)
pstat.setString(2, area)
pstat.setString(3, city)
pstat.setString(4, adid)
pstat.setLong(5, count)
pstat.setLong(6, count)
pstat.executeUpdate()
}
}
//TODO 关闭资源
pstat.close()
conn.close()
}
)
}
)
}
}
*DateAreaCityAdCountAnalysisDao*
class DateAreaCityAdCountAnalysisDao extends TDao{
}
*DateAreaCityAdCountAnalysisApplication*
object DateAreaCityAdCountAnalysisApplication extends App with TApplication{
start("sparkStreaming"){
val controller = new DateAreaCityAdCountAnalysisController
controller.excute()
}
}
*结果展示*
*需求6.最近一小时点击量*
*LasHourAdCountAnalysis********Controller*
class LasHourAdCountAnalysisController extends TController{
private val lasHourAdCountAnalysisService = new LasHourAdCountAnalysisService
override def excute(): Unit = {
lasHourAdCountAnalysisService.analysis()
}
}
*LasHourAdCountAnalysis********Service*
class LasHourAdCountAnalysisService extends TService{
private val lasHourAdCountAnalysisDao = new LasHourAdCountAnalysisDao
/**
* 数据分析
* @return
*/
override def analysis() = {
//TODO 读kafka数据
val massage = lasHourAdCountAnalysisDao.readKafka()
//TODO 对数据进行格式化
val logDS = massage.map(
data => {
val datas = data.split(" ")
Ad_Click_Log(datas(0), datas(1), datas(2), datas(3), datas(4))
}
)
//TODO 操作时间戳123456/10000->12*10000->120000
//格式转换只保留时间戳、广告、点击数
val toCountDS = logDS.map(
log => {
val ts = log.ts.toLong
((log.adid, ts / 10000 * 10000), 1)
}
)
//开窗统计,每十秒统计一次,窗口内放最近一分钟的数据
val sumDS = toCountDS.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(60),Seconds(10))
//格式转换,以广告为key,顺便把时间戳转换成时间格式
val sdf = new SimpleDateFormat("yyyy-MM-dd-HH")
val adToTimeSumDS = sumDS.map {
case ((adid, time), sum) => {
(adid, (sdf.format(time)+":00", sum))
}
}
//根据key聚合
val value = adToTimeSumDS.groupByKey()
val resultDS = value.mapValues(
iter => {
iter.toList.sortWith(
(left, right) => {
left._1 < right._1
}
)
}
)
resultDS.print()
}
}
*LasHourAdCountAnalysis********Dao*
class LasHourAdCountAnalysisDao extends TDao{
}
*LasHourAdCountAnalysis********Application*
object LasHourAdCountAnalysisApplication extends App with TApplication{
start("sparkStreaming"){
val controller = new LasHourAdCountAnalysisController
controller.excute()
}
}
*结果展示*
至此项目已经完成,项目源码
下载