redis消息队列适合轻量级高并发的情况,比如秒杀,及时数据分析等。
首先springboot配置文件配置如下:
spring:
  redis:
    database: 1
    host: 192.168.94.151
    port: 6379
    password: xuhaixing
    jedis:
      pool:
            max-idle: 8
            min-idle: 0
            max-active: 8
            max-wait: -1
    timeout: 5000
server:
  port: 8081注册StringRedisTemplate
/**
 * 注册redisTemplate,作为消息队列的发布者
 */
@Configuration
public class PublisherConfig {
    @Bean
    public StringRedisTemplate getRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        return new StringRedisTemplate(redisConnectionFactory);
    }
}
消息生产者,注入redisTemplate,用convertAndSend发送消息
@Service
public class PublisherService {
    @Autowired
    private StringRedisTemplate redisTemplate;
    public String sendMessage(String name) {
        try {
            redisTemplate.convertAndSend("TOPIC_USERNAME", name);
            return "消息发送成功了";
        } catch (Exception e) {
            e.printStackTrace();
            return "消息发送失败了";
        }
    }
}
在controller中注入service,请求时发送消息
@RestController
@RequestMapping("publisher")
public class PublisherController {
    @Autowired
    private PublisherService publisherService;
    @RequestMapping("{name}")
    public String sendMessage(@PathVariable("name") String name) {
        return publisherService.sendMessage(name);
    }
}消费者:创建一个接收消息的类,继承MessageListener,也可以不继承
 继承:
@Component
public class Receiver implements MessageListener {
    private static Logger logger = LoggerFactory.getLogger(Receiver.class);
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Override
    public void onMessage(Message message, byte[] pattern) {
        RedisSerializer<String> valueSerializer = redisTemplate.getStringSerializer();
        String deserialize = valueSerializer.deserialize(message.getBody());
        logger.info("收到的mq消息" + deserialize);
    }
}不继承:
@Component
public class Receiver {
    private static Logger logger = LoggerFactory.getLogger(Receiver.class);
    public void receiveMessage(String message) {
        logger.info("收到的mq消息" + message);
    }
}消息订阅者配置类:
@Configuration
@AutoConfigureAfter({Receiver.class})
public class SubscriberConfig {
    /**
     * 消息监听适配器,注入接受消息方法,输入方法名字 反射方法
     *
     * @param receiver
     * @return
     */
    @Bean
    public MessageListenerAdapter getMessageListenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage"); //当没有继承MessageListener时需要写方法名字
    }
    /**
     * 创建消息监听容器
     *
     * @param redisConnectionFactory
     * @param messageListenerAdapter
     * @return
     */
    @Bean
    public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic("TOPIC_USERNAME"));
        return redisMessageListenerContainer;
    }
}消费者和生产者配置已经完成了,启动springboot程序,用postman请求controller方法就可以了。
实时内容请关注微信公众号,公众号与博客同时更新
     
   
 
版权声明:本文为u012326462原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
