目录
2.2.1 订阅监听方式1-RedisMessageListener
2.2.2 订阅监听方式2-MessageListenerAdapter
1.介绍
主要实现两个功能
1.过期事件监听 KeyspaceEventMessageListener
2.发布订阅 RedisMeaasgeListener DispatchMessageListener MessagelistenerAdapter
2.发布订阅功能
发布订阅功能,主要是通过频道channel来作为衔接的桥梁
甲发消息到指定的频道 (发布)
乙通过监听该频道来获取甲发送的消息(可以有多个乙方) (订阅)
2.1 发布实现
/**
* @Author ZCX
* @create 2023/4/7 12:42
*/
@SpringBootTest
public class PubController {
@Autowired
RedisTemplate redisTemplate;
@Test
public void publish() {
// 发布消息
redisTemplate.convertAndSend("dog", "wangwang");
redisTemplate.convertAndSend("cat", "miaomiao");
}
}
2.2 订阅实现
2.2.1 订阅监听方式1-RedisMessageListener
@Configuration
public class RedisConfig {
/**
* 自定义序列化方式
*/
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 使用Jackson2JsonRedisSerialize 替换默认序列化
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 将类名称序列化到json串中,去掉会导致得出来的是LinkedHashMap对象,直接转换实体对象会失败
objectMapper.activateDefaultTyping(
LaissezFaireSubTypeValidator.instance ,
ObjectMapper.DefaultTyping.NON_FINAL,
JsonTypeInfo.As.WRAPPER_ARRAY);
// 解决jackson2无法反序列化LocalDateTime的问题,因为LocalDateTime没有constructor
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.registerModule(new JavaTimeModule());
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 初始化string的序列化方式
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
redisTemplate.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
redisTemplate.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
redisTemplate.setValueSerializer(stringRedisSerializer);
// hash的value序列化方式采用jackson
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory factory, RedisMessageListener listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
// 订阅频道dog 和 cat ,添加多个频道
container.addMessageListener(listener, new ChannelTopic("dog"));
container.addMessageListener(listener, new ChannelTopic("cat"));
return container;
}
}
/**
* @Author ZCX
* @create 2023/4/7 11:13
*/
@Component
public class RedisMessageListener implements MessageListener {
@Autowired
private RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] messageBody = message.getBody();
// 获取value定义的序列化方式反序列化
Object message= redisTemplate.getValueSerializer().deserialize(messageBody);
byte[] channelByte = message.getChannel();
// 使用string的序列化方式反序列化
Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
String patternStr = new String(pattern);
System.out.println(patternStr);
System.out.println("---channel---: " + channel);
System.out.println("---message---: " + message);
}
}
2.2.2 订阅监听方式2-MessageListenerAdapter
/**
* @Author ZCX
* @create 2023/4/7 11:17
*/
@Configuration
public class RedisConfig {
/**
* 自定义序列化方式
*/
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 使用Jackson2JsonRedisSerialize 替换默认序列化
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 将类名称序列化到json串中,去掉会导致得出来的是LinkedHashMap对象,直接转换实体对象会失败
objectMapper.activateDefaultTyping(
LaissezFaireSubTypeValidator.instance ,
ObjectMapper.DefaultTyping.NON_FINAL,
JsonTypeInfo.As.WRAPPER_ARRAY);
// 解决jackson2无法反序列化LocalDateTime的问题,因为LocalDateTime没有constructor
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.registerModule(new JavaTimeModule());
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 初始化string的序列化方式
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
redisTemplate.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
redisTemplate.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
redisTemplate.setValueSerializer(stringRedisSerializer);
// hash的value序列化方式采用jackson
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory factory, MessageListenerAdapter adapter
) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
// 订阅多个频道
container.addMessageListener(adapter, new ChannelTopic("dog"));
container.addMessageListener(adapter, new ChannelTopic("cat"));
return container;
}
/**
* MessageListenerAdapter内部使用反射机制处理传入的类及指定的方法,默认方法名称为handleMessage
*/
@Bean
public MessageListenerAdapter listenerAdapter(RedisMessageReceiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
@Component
public class RedisMessageReceiver {
public void receiveMessage(String message,String channel){
System.out.println("---channel---: " + channel);
System.out.println("---message---: " + message);
}
}
3.事件失效监听
/**
* @Author ZCX
* @create 2023/4/7 11:17
*/
@Configuration
public class RedisConfig {
/**
* 自定义序列化方式
*/
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 使用Jackson2JsonRedisSerialize 替换默认序列化
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 将类名称序列化到json串中,去掉会导致得出来的是LinkedHashMap对象,直接转换实体对象会失败
objectMapper.activateDefaultTyping(
LaissezFaireSubTypeValidator.instance ,
ObjectMapper.DefaultTyping.NON_FINAL,
JsonTypeInfo.As.WRAPPER_ARRAY);
// 解决jackson2无法反序列化LocalDateTime的问题,因为LocalDateTime没有constructor
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.registerModule(new JavaTimeModule());
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 初始化string的序列化方式
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
redisTemplate.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
redisTemplate.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
redisTemplate.setValueSerializer(stringRedisSerializer);
// hash的value序列化方式采用jackson
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/**
* redis服务端需要配置 notify-keyspace-events 参数 ,至少包含k或者e
* K 键空间通知,所有通知以 __keyspace@<db>__ 为前缀
* E 键事件通知,所有通知以 __keyevent@<db>__ 为前缀
* g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知
* $ 字符串命令的通知
* l 列表命令的通知
* s 集合命令的通知
* h 哈希命令的通知
* z 有序集合命令的通知
* x 过期事件:每当有过期键被删除时发送
* e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送
* A 参数 g$lshzxe 的别名
*
* @后边可以指定db库,*代表所有库,0代表0库
* __keyevent@0__:expired 0库过期的数据
* __keyspace@0__:mykey 0库mykey这个键的所有操作
* __keyevent@0__:del 0库所有del这个命令
*/
@Bean
public KeyExpirationListener registerListener(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
KeyExpirationListener listener = new KeyExpirationListener(container);
// 不指定默认为 Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");
container.addMessageListener(listener,new PatternTopic("__keyevent@*__:expired"));
return listener;
}
}
/**
* 1.区分监听的key名称
* 2.监听的key只能查到key名称,查询不到对应的值
* 3.在key过期前主动删除该key是不会触发过期监听事件的
* 4.分布式场景下,该监听器会监听多次,因此需要使用锁(防止同一个key被监听执行多次)
*/
@Component
public class KeyExpirationListener extends KeyExpirationEventMessageListener {
@Autowired
private StringRedisTemplate redisTemplate;
public KeyExpirationListener(RedisMessageListenerContainer container){
super(container);
}
@Override
public void onMessage(Message message, @Nullable byte[] pattern) {
String expiredKey = message.toString();
// 匹配指定格式的key
if (expiredKey.startsWith("product:id")) {
// 临时key,此key可以在业务处理完,然后延迟一定时间删除,或者不处理
String tempKey = expireKey + "lock";
// 临时key不存在才设置值,key超时时间为10秒(此处相当于分布式锁的应用)
Boolean exist = redisTemplate.opsForValue().setIfAbsent(tempKey , "1", 10, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(exist)) {
System.out.println("业务处理...");
// 比如截取里面的id,然后关联数据库进行处理
} else {
System.out.println("其他业务处理...");
}
}
}
}
版权声明:本文为weixin_39523456原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。