Flink入门第十二课:DataStream api/Flink sql实现每隔5分钟统计最近一小时热门商品小案例

  • Post author:
  • Post category:其他


用到的数据文件

用到的数据文件
链接:https://pan.baidu.com/s/1uCk-IF4wWVfUkuuTAKaD0w 
提取码:2hmu

1、需求 & 数据

用户行为数据不断写入kafka,程序不断从kafka读取数据,每个五分钟统计最近
一小时浏览次数最多的热门商品top 5。

输入数据:
UserBehavior
    字段名:userId  itemId  categoryId  behavior timestamp
    解释:  用户名  商品id   商品类别id   行为      时间戳
    值举例: lily    1715     1464116    pv       1511658000
    类型:    Long   Long      Integer    String   Long

输出数据:
ItemViewCount
    字段名  itemId   count_pv     windowEnd
    解释:  商品id    商品pv总数   窗口结束时间戳
    值举例:1715      17           1511658000000
    类型:   Long      Long         Long

2、实体类

package com.atguigu.hotitems_analysis.beans;

/**
 *
 */
public class UserBehavior {
    public Long userId;
    public Long itemId;
    public Integer categoryId;
    public String behavior;
    public Long timestamp;

    public UserBehavior() {
    }

    public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {
        this.userId = userId;
        this.itemId = itemId;
        this.categoryId = categoryId;
        this.behavior = behavior;
        this.timestamp = timestamp;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public Long getItemId() {
        return itemId;
    }

    public void setItemId(Long itemId) {
        this.itemId = itemId;
    }

    public Integer getCategoryId() {
        return categoryId;
    }

    public void setCategoryId(Integer categoryId) {
        this.categoryId = categoryId;
    }

    public String getBehavior() {
        return behavior;
    }

    public void setBehavior(String behavior) {
        this.behavior = behavior;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "UserBehavior{" +
                "userId=" + userId +
                ", itemId=" + itemId +
                ", categoryId=" + categoryId +
                ", behavior='" + behavior + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }

}
package com.atguigu.hotitems_analysis.beans;

/**
 * 处理后的结果类
 */
public class ItemViewCount {
    public Long itemId;
    public Long windowEnd;
    public Long count;

    public ItemViewCount() {
    }

    public ItemViewCount(Long itemId, Long windowEnd, Long count) {
        this.itemId = itemId;
        this.windowEnd = windowEnd;
        this.count = count;
    }

    public Long getItemId() {
        return itemId;
    }

    public void setItemId(Long itemId) {
        this.itemId = itemId;
    }

    public Long getWindowEnd() {
        return windowEnd;
    }

    public void setWindowEnd(Long windowEnd) {
        this.windowEnd = windowEnd;
    }

    public Long getCount() {
        return count;
    }

    public void setCount(Long count) {
        this.count = count;
    }

    @Override
    public String toString() {
        return "ItemViewCount{" +
                "itemId=" + itemId +
                ", windowEnd=" + windowEnd +
                ", count=" + count +
                '}';
    }
}

3、用户行为数据写入Kafka

package com.atguigu.hotitems_analysis;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;

public class KafkaProducerUtil {
    public static void main(String[] args) throws Exception{
        writeToKafka("hotitems_test");
    }

    public static void writeToKafka(String topic) throws Exception{
        Properties ps = new Properties();
            ps.setProperty("bootstrap.servers","192.168.149.131:9092");//集群地址
            ps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//key序列化方式
            ps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//value序列化方式

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(ps);//
        BufferedReader bufferedReader = new BufferedReader(new FileReader("G:\\SoftwareInstall\\idea\\p



版权声明:本文为pengzonglu7292原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。