springboot-redis-messageListener

  • Post author:
  • Post category:其他



目录


1.介绍


2.发布订阅功能


2.1 发布实现


2.2 订阅实现


2.2.1 订阅监听方式1-RedisMessageListener


2.2.2 订阅监听方式2-MessageListenerAdapter


3.事件失效监听


1.介绍

主要实现两个功能

1.过期事件监听 KeyspaceEventMessageListener

2.发布订阅 RedisMeaasgeListener DispatchMessageListener MessagelistenerAdapter

2.发布订阅功能

发布订阅功能,主要是通过频道channel来作为衔接的桥梁

甲发消息到指定的频道 (发布)

乙通过监听该频道来获取甲发送的消息(可以有多个乙方) (订阅)

2.1 发布实现

/**
 * @Author ZCX
 * @create 2023/4/7 12:42
 */
@SpringBootTest
public class PubController {

    @Autowired
    RedisTemplate redisTemplate;
    
    @Test
    public void publish() {
        // 发布消息
        redisTemplate.convertAndSend("dog", "wangwang");
        redisTemplate.convertAndSend("cat", "miaomiao");
    }
}

2.2 订阅实现

2.2.1 订阅监听方式1-RedisMessageListener

@Configuration
public class RedisConfig {

    /**
     * 自定义序列化方式
     */
    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 使用Jackson2JsonRedisSerialize 替换默认序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 将类名称序列化到json串中,去掉会导致得出来的是LinkedHashMap对象,直接转换实体对象会失败
        objectMapper.activateDefaultTyping(
                LaissezFaireSubTypeValidator.instance ,
                ObjectMapper.DefaultTyping.NON_FINAL,
                JsonTypeInfo.As.WRAPPER_ARRAY);
        // 解决jackson2无法反序列化LocalDateTime的问题,因为LocalDateTime没有constructor
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        objectMapper.registerModule(new JavaTimeModule());

        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

        // 初始化string的序列化方式
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        redisTemplate.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列化方式
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式采用jackson
        redisTemplate.setValueSerializer(stringRedisSerializer);
        // hash的value序列化方式采用jackson
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory factory, RedisMessageListener listener) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        // 订阅频道dog 和 cat ,添加多个频道
        container.addMessageListener(listener, new ChannelTopic("dog"));
        container.addMessageListener(listener, new ChannelTopic("cat"));
        return container;
    }
}
/**
 * @Author ZCX
 * @create 2023/4/7 11:13
 */
@Component
public class RedisMessageListener implements MessageListener {

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] messageBody = message.getBody();
        // 获取value定义的序列化方式反序列化
        Object message= redisTemplate.getValueSerializer().deserialize(messageBody);
        byte[] channelByte = message.getChannel();
        // 使用string的序列化方式反序列化
        Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
        String patternStr = new String(pattern);
        System.out.println(patternStr);
        System.out.println("---channel---: " + channel);
        System.out.println("---message---: " + message);
    }
}

2.2.2 订阅监听方式2-MessageListenerAdapter

/**
 * @Author ZCX
 * @create 2023/4/7 11:17
 */
@Configuration
public class RedisConfig {

    /**
     * 自定义序列化方式
     */
    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 使用Jackson2JsonRedisSerialize 替换默认序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 将类名称序列化到json串中,去掉会导致得出来的是LinkedHashMap对象,直接转换实体对象会失败
        objectMapper.activateDefaultTyping(
                LaissezFaireSubTypeValidator.instance ,
                ObjectMapper.DefaultTyping.NON_FINAL,
                JsonTypeInfo.As.WRAPPER_ARRAY);
        // 解决jackson2无法反序列化LocalDateTime的问题,因为LocalDateTime没有constructor
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        objectMapper.registerModule(new JavaTimeModule());

        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

        // 初始化string的序列化方式
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        redisTemplate.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列化方式
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式采用jackson
        redisTemplate.setValueSerializer(stringRedisSerializer);
        // hash的value序列化方式采用jackson
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory factory, MessageListenerAdapter adapter
    ) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        // 订阅多个频道
        container.addMessageListener(adapter, new ChannelTopic("dog"));
        container.addMessageListener(adapter, new ChannelTopic("cat"));
        return container;
    }

    /**
     * MessageListenerAdapter内部使用反射机制处理传入的类及指定的方法,默认方法名称为handleMessage
     */
    @Bean
    public MessageListenerAdapter listenerAdapter(RedisMessageReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}
@Component
public class RedisMessageReceiver {
    public void receiveMessage(String message,String channel){
        System.out.println("---channel---: " + channel);
        System.out.println("---message---: " + message);
    }
}

3.事件失效监听

/**
 * @Author ZCX
 * @create 2023/4/7 11:17
 */
@Configuration
public class RedisConfig {


    /**
     * 自定义序列化方式
     */
    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 使用Jackson2JsonRedisSerialize 替换默认序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 将类名称序列化到json串中,去掉会导致得出来的是LinkedHashMap对象,直接转换实体对象会失败
        objectMapper.activateDefaultTyping(
                LaissezFaireSubTypeValidator.instance ,
                ObjectMapper.DefaultTyping.NON_FINAL,
                JsonTypeInfo.As.WRAPPER_ARRAY);
        // 解决jackson2无法反序列化LocalDateTime的问题,因为LocalDateTime没有constructor
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        objectMapper.registerModule(new JavaTimeModule());

        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

        // 初始化string的序列化方式
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        redisTemplate.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列化方式
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式采用jackson
        redisTemplate.setValueSerializer(stringRedisSerializer);
        // hash的value序列化方式采用jackson
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
    /**
     * redis服务端需要配置 notify-keyspace-events 参数 ,至少包含k或者e
     * K 键空间通知,所有通知以 __keyspace@<db>__ 为前缀
     * E 键事件通知,所有通知以 __keyevent@<db>__ 为前缀
     * g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知
     * $ 字符串命令的通知
     * l 列表命令的通知
     * s 集合命令的通知
     * h 哈希命令的通知
     * z 有序集合命令的通知
     * x 过期事件:每当有过期键被删除时发送
     * e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送
     * A 参数 g$lshzxe 的别名
     *
     * @后边可以指定db库,*代表所有库,0代表0库
     * __keyevent@0__:expired 0库过期的数据
     * __keyspace@0__:mykey   0库mykey这个键的所有操作
     * __keyevent@0__:del     0库所有del这个命令
     */
    @Bean
    public KeyExpirationListener registerListener(RedisConnectionFactory factory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        KeyExpirationListener listener = new KeyExpirationListener(container);
        // 不指定默认为 Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");
        container.addMessageListener(listener,new PatternTopic("__keyevent@*__:expired"));
        return listener;
    }
}
/**
 * 1.区分监听的key名称
 * 2.监听的key只能查到key名称,查询不到对应的值
 * 3.在key过期前主动删除该key是不会触发过期监听事件的
 * 4.分布式场景下,该监听器会监听多次,因此需要使用锁(防止同一个key被监听执行多次)
 */
@Component
public class KeyExpirationListener extends KeyExpirationEventMessageListener {

    @Autowired
    private StringRedisTemplate redisTemplate;
    
    public KeyExpirationListener(RedisMessageListenerContainer container){
        super(container);
    }

    @Override
    public void onMessage(Message message, @Nullable byte[] pattern) {
        String expiredKey = message.toString();
        // 匹配指定格式的key
        if (expiredKey.startsWith("product:id")) {
            // 临时key,此key可以在业务处理完,然后延迟一定时间删除,或者不处理
            String tempKey = expireKey + "lock";
            // 临时key不存在才设置值,key超时时间为10秒(此处相当于分布式锁的应用)
            Boolean exist = redisTemplate.opsForValue().setIfAbsent(tempKey , "1", 10, TimeUnit.SECONDS);
            if (Boolean.TRUE.equals(exist)) {
                System.out.println("业务处理...");
                // 比如截取里面的id,然后关联数据库进行处理
            } else {
                System.out.println("其他业务处理...");
            }
        } 
    }
}



版权声明:本文为weixin_39523456原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。