用到的数据文件
用到的数据文件
链接:https://pan.baidu.com/s/1uCk-IF4wWVfUkuuTAKaD0w
提取码:2hmu
1、需求 & 数据
用户行为数据不断写入kafka,程序不断从kafka读取数据,每个五分钟统计最近
一小时浏览次数最多的热门商品top 5。
输入数据:
UserBehavior
字段名:userId itemId categoryId behavior timestamp
解释: 用户名 商品id 商品类别id 行为 时间戳
值举例: lily 1715 1464116 pv 1511658000
类型: Long Long Integer String Long
输出数据:
ItemViewCount
字段名 itemId count_pv windowEnd
解释: 商品id 商品pv总数 窗口结束时间戳
值举例:1715 17 1511658000000
类型: Long Long Long
2、实体类
package com.atguigu.hotitems_analysis.beans;
/**
*
*/
public class UserBehavior {
public Long userId;
public Long itemId;
public Integer categoryId;
public String behavior;
public Long timestamp;
public UserBehavior() {
}
public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {
this.userId = userId;
this.itemId = itemId;
this.categoryId = categoryId;
this.behavior = behavior;
this.timestamp = timestamp;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public Long getItemId() {
return itemId;
}
public void setItemId(Long itemId) {
this.itemId = itemId;
}
public Integer getCategoryId() {
return categoryId;
}
public void setCategoryId(Integer categoryId) {
this.categoryId = categoryId;
}
public String getBehavior() {
return behavior;
}
public void setBehavior(String behavior) {
this.behavior = behavior;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "UserBehavior{" +
"userId=" + userId +
", itemId=" + itemId +
", categoryId=" + categoryId +
", behavior='" + behavior + '\'' +
", timestamp=" + timestamp +
'}';
}
}
package com.atguigu.hotitems_analysis.beans;
/**
* 处理后的结果类
*/
public class ItemViewCount {
public Long itemId;
public Long windowEnd;
public Long count;
public ItemViewCount() {
}
public ItemViewCount(Long itemId, Long windowEnd, Long count) {
this.itemId = itemId;
this.windowEnd = windowEnd;
this.count = count;
}
public Long getItemId() {
return itemId;
}
public void setItemId(Long itemId) {
this.itemId = itemId;
}
public Long getWindowEnd() {
return windowEnd;
}
public void setWindowEnd(Long windowEnd) {
this.windowEnd = windowEnd;
}
public Long getCount() {
return count;
}
public void setCount(Long count) {
this.count = count;
}
@Override
public String toString() {
return "ItemViewCount{" +
"itemId=" + itemId +
", windowEnd=" + windowEnd +
", count=" + count +
'}';
}
}
3、用户行为数据写入Kafka
package com.atguigu.hotitems_analysis;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
public class KafkaProducerUtil {
public static void main(String[] args) throws Exception{
writeToKafka("hotitems_test");
}
public static void writeToKafka(String topic) throws Exception{
Properties ps = new Properties();
ps.setProperty("bootstrap.servers","192.168.149.131:9092");//集群地址
ps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//key序列化方式
ps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//value序列化方式
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(ps);//
BufferedReader bufferedReader = new BufferedReader(new FileReader("G:\\SoftwareInstall\\idea\\p
版权声明:本文为pengzonglu7292原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。