阿里云kafka使用

  • Post author:
  • Post category:其他


1.引入依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.5.0.RELEASE</version>
        </dependency>

2.发消息工具:KafkaSender.java

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;

/**
 * @ClassName KafkaSender
 * @Description: TODO
 * @Author yangpeng
 * @Date 2020/6/6
 **/
@Component
public class KafkaSender {

    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 发送消息到kafka
     *@param topic 主题
     *@param message 内容体
     */
    public void sendMsg(String topic , String message){
        kafkaTemplate.send(topic ,message);
    }
}

3.定时开启和关闭消费:

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;

/**
 * 定时消费kafka浏览记录
 * @ClassName KafkaConfigurationTask
 * @Description: TODO
 * @Author yangpeng
 * @Date 2020/6/15
 **/
@Component
@Slf4j
public class KafkaConfigurationTask {
    @Resource
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    /**
     * 每天凌晨1点开启消费
     * @author yangpeng
     * @date 2020/6/15
     * @param
     * @return void
     */
    @Scheduled(cron = "0 0 1 * * ?")
    public void startListener() {
        log.info("开启消费消息-定时任务开启");
        MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer("act-task");
        if (!container.isRunning()) {
            container.start();
        }
        container.resume();
    }

    /**
     * 每天早上6点关闭消费
     * @author yangpeng
     * @date 2020/6/15
     * @param
     * @return void
     */
    @Scheduled(cron = "0 0 6 * * ?")
    public void shutdownListener() {
        log.info("关闭消费消息-定时任务关闭");
        MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer("act-task");
        container.pause();
    }
}

4.ThrealPoolUtil


import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.*;

/**
 * @ClassName ThrealPoolUtil
 * @Description: TODO
 * @Author yangpeng
 * @Date 2020/6/9
 **/
@Configuration
@Slf4j
public class ThrealPoolUtil {

    @Value("${pool.save.thread.pool.core}")
    private int core;

    @Value("${pool.save.thread.pool.max}")
    private int max;

    @Value("${pool.save.thread.pool.queue}")
    private int queue;

    @Bean(value = "threadPoolViewLog")
    public ExecutorService threadPoolViewLog(){
        //return executorService;
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("pool-save-view-log").build();
        return new ThreadPoolExecutor(core, max,
                1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(queue),
                threadFactory, new ThreadPoolExecutor.AbortPolicy());
    }



}

5.kafka监听器,当kafka开启消费时,会执行@KafkaListener下的方法


import com.cqliving.act.ss.dal.entity.ViewLogDO;
import com.cqliving.act.ss.dal.mapper.ViewLogDAO;
import cqliving.framework.cloud.core.utils.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;

/**
 * @ClassName KafkaListener
 * @Description: TODO
 * @Author yangpeng
 * @Date 2020/6/6
 **/
@Component
@Slf4j
public class ActLogListener {

    @Autowired
    private ViewLogDAO viewLogDAO;
    @Autowired
    private JSONUtil jsonUtil;
    @Autowired
    private KafkaSender kafkaSender;
    @Value("${spring.kafka.concurrency}")
    private int concurrency;
    @Value("${spring.kafka.topic.backup}")
    private String backupTopic;
    @Value("${spring.kafka.topic}")
    private String topic;

    @Autowired
    private ThrealPoolUtil threalPoolUtil;

    @KafkaListener(id = "act-task", topics = {"${spring.kafka.topic}"}, groupId = "${spring.kafka.groupId}",containerFactory = "batchFactory")
    public void listen(List<ConsumerRecord<?, ?>> recordList) {
        threalPoolUtil.threadPoolViewLog().execute(()->{
            if (topic.equals(backupTopic)){
                saveViewLog(recordList);
            }else {
                saveViewLogs(recordList);
            }
        });
    }

    @Async(value="threadPoolViewLog")
    protected void saveViewLogs(List<ConsumerRecord<?, ?>> recordList){
        List<ViewLogDO> logDOS = new ArrayList<>();
        try {
            for (ConsumerRecord record : recordList) {
                String msg = record.value().toString();
                ViewLogDO viewLogDO = jsonUtil.toObj(msg, ViewLogDO.class);
                logDOS.add(viewLogDO);
            }
            viewLogDAO.batchInsert(logDOS);
        }catch (Exception e){
            logDOS.forEach(viewLogDO->{
                kafkaSender.sendMsg(backupTopic,jsonUtil.toStr(viewLogDO));
            });
        }

        /*Thread t = Thread.currentThread();
        System.out.println(t.getId()+":"+t.getName()+":消费消息");*/
    }

    @Async(value="threadPoolViewLog")
    protected void saveViewLog(List<ConsumerRecord<?, ?>> recordList){
        ViewLogDO viewLogDO = new ViewLogDO();
        try {
            for (ConsumerRecord record : recordList) {
                String msg = record.value().toString();
                viewLogDO = jsonUtil.toObj(msg, ViewLogDO.class);
                viewLogDAO.insert(viewLogDO);
            }
        }catch (Exception e){
            log.error("浏览记录消息消息失败:"+e.getMessage());
            log.error(recordList.get(0).value().toString());
        }

        /*Thread t = Thread.currentThread();
        System.out.println(t.getId()+":"+t.getName()+":消费消息");*/
    }


    /**
     * kafka监听工厂
     *
     * @param configurer
     * @return
     */
    @Bean("batchFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        //并发消费
        factory.setConsumerFactory(consumerFactory);
        //设置并发
        factory.setConcurrency(concurrency);
        //开启批量消费功能
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(7000);
        //不自动启动
        factory.setAutoStartup(false);
        configurer.configure(factory, consumerFactory);
        return factory;
    }



}

6.配置文件

# 指定kafka 代理地址
spring.kafka.bootstrap-servers=192.168.201.136:9092
# 指定kafka topic
spring.kafka.topic=act-task
# 消费组id
spring.kafka.groupId=act-task
# kafka消费失败转发 backupTopic
spring.kafka.topic.backup=act-task-back-up
#并发数,数量参考分区,消费组来定(这里其实没有意义,就一个消费组,所有消费者都在一个组里面)
spring.kafka.concurrency=5
redis.view-count-limit=1
#=============== provider  =======================
#设置大于0的值,则客户端会将发送失败的记录重新发送
spring.kafka.producer.retries=
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式 UTF-8
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer =========================
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.bootstrap-servers=192.168.201.136:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 一次最多消费50条,消费转发消息时(topic=act-task-back-up),设置为1
spring.kafka.consumer.max-poll-records=50
#=============== 浏览日志消费线程池大小 =========================
#核心线程池
pool.save.thread.pool.core=5
#最大线程池
pool.save.thread.pool.max=8
#工作队列-最多5000,超过跑出
pool.save.thread.pool.queue=5000



版权声明:本文为k21325原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。