redis.conf
############################# EVENT NOTIFICATION ##############################
# Redis能够将在keyspace中发生的事件通知给 发布/订阅 客户端
# Redis can notify Pub/Sub clients about events happening in the key space.
# This feature is documented at http://redis.io/topics/notifications
# 例如:如果开启了keyspace事件通知,一个客户端在数据库0对一个叫'foo'的key执行了删除操作,
# 那么redis将会通过 发布订阅 机制发布2条消息
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo
# For instance if keyspace events notification is enabled, and a client
# performs a DEL operation on key "foo" stored in the Database 0, two
# messages will be published via Pub/Sub:
#
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo
# 也可以指定一组 类名 来选择 Redis 会通知的一类事件。
# 每类事件 都通过一个字符定义
# It is possible to select the events that Redis will notify among a set
# of classes. Every class is identified by a single character:
# keySpace事件 以 __keyspace@<数据库序号>__ 为前缀 发布事件
# K Keyspace events, published with __keyspace@<db>__ prefix.
# Keyevent事件 以 __keyevent@<数据库序号>__ 为前缀 发布事件
# E Keyevent events, published with __keyevent@<db>__ prefix.
# 执行常规命令,比如del、expire、rename
# g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
# 执行 String 命令
# $ String commands
# 执行 List 命令
# l List commands
# 执行 Set 命令
# s Set commands
# 执行 Hash 命令
# h Hash commands 执行 Hash 命令
# 执行 ZSet 命令
# z Sorted set commands
# key过期事件(每个key失效都会触发这类事件)
# x Expired events (events generated every time a key expires)
# key驱逐事件(当key在内存满了被清除时生成)
# e Evicted events (events generated when a key is evicted for maxmemory)
# A是g$lshzxe的别名,因此AKE就意味着所有的时间
# A Alias for g$lshzxe, so that the "AKE" string means all the events.
#
# 配置中的notify-keyspace-events这个参数由0个或多个字符组成,
# 如果配置为空字符串表示禁用通知
# The "notify-keyspace-events" takes as argument a string that is composed
# of zero or multiple characters. The empty string means that notifications
# are disabled.
#
# 比如,要开启list命令和generic常规命令的事件通知,
# 应该配置成 notify-keyspace-events Elg
# Example: to enable list and generic events, from the point of view of the
# event name, use:
#
# notify-keyspace-events Elg
#
# 比如,订阅了__keyevent@0__:expired频道的客户端要收到key失效的时间,
# 应该配置成 notify-keyspace-events Ex
# Example 2: to get the stream of the expired keys subscribing to channel name __keyevent@0__:expired use:
#
# notify-keyspace-events Ex
#
# 默认情况下,所有的通知都被禁用了,并且这个特性有性能上的开销。
# 注意,K和E必须至少指定其中一个,否则,将收不到任何事件。
# By default all notifications are disabled because most users don't need
# this feature and the feature has some overhead. Note that if you don't
# specify at least one of K or E, no events will be delivered.
notify-keyspace-events "Ex"
############################### ADVANCED CONFIG ###############################
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.12.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
spring:
redis:
database: 0
host: localhost
port: 6379
timeout: 6000 #连接超长时长(毫秒)
jedis:
pool:
max-active: 1000 #连接池最大连接数(使用负值表示没有限制)
max-wait: -1ms #连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 10 #连接池中最大空闲连接
min-idle: 5 #连接池中最小空闲连接
配置
RedisConfig
@Configuration
public class RedisConfig {
@Autowired
private RedisConnectionFactory factory;
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setConnectionFactory(this.factory);
return redisTemplate;
}
}
RedisMessageListener
@Configuration
public class RedisMessageListener {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 监听idea通道
container.addMessageListener(listenerAdapter, new PatternTopic("idea"));
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(ReceiverRedisMessage receiver) {
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(receiver,"receiveMessage");
return listenerAdapter;
}
}
ReceiverRedisMessage
@Component
public class ReceiverRedisMessage {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 见:MessageListenerAdapter$MostSpecificMethodFilter#matches
* jsonMsg:收到的消息
* topic: 收到消息的channel
*
* @param jsonMsg
* @param topic
*/
public void receiveMessage(String jsonMsg,String topic) {
System.out.println("监听消息:->" + jsonMsg);
try {
//ObjectMapper mapper = new ObjectMapper();
//Map map = mapper.readValue(jsonMsg, Map.class);
// 以下可以处理json字符串多斜杠的问题
MonitorData dto = JSON.parseObject(JSON.parse(jsonMsg).toString(), MonitorData.class);
} catch (Exception e) {
System.out.println("失败:"+ jsonMsg);
}
}
}
JsonUtil
public abstract class JsonUtil {
private static final ObjectMapper objectMapper = new ObjectMapper();
public JsonUtil() {
}
public static final Map<String, Object> json2Map(String json) {
if (json == null) {
return null;
} else {
try {
return (Map)objectMapper.readValue(json, Map.class);
} catch (Exception var2) {
throw new RuntimeException(var2);
}
}
}
public static String map2Json(Map<String, Object> map) {
return obj2Json(map);
}
public static final <T> T json2Obj(String content, Class<T> clazz) {
if (StringUtils.isBlank(content)) {
return null;
} else {
try {
return objectMapper.readValue(content, clazz);
} catch (Exception var3) {
throw new RuntimeException(var3);
}
}
}
public static String obj2Json(Object obj) {
if (obj == null) {
return null;
} else {
try {
return objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException var2) {
throw new RuntimeException(var2);
}
}
}
public static <T> T fromJson(String jsonString, JavaType javaType) {
try {
return objectMapper.readValue(jsonString, javaType);
} catch (Exception var3) {
throw new RuntimeException(var3);
}
}
static {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(Feature.ALLOW_SINGLE_QUOTES, true);
objectMapper.configure(Feature.ALLOW_NUMERIC_LEADING_ZEROS, true);
}
}
RedisController
@RestController
public class RedisController {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@RequestMapping("pub")
public String pub() {
MonitorData dto = new MonitorData();
dto.setRegionId(100L);
ArrayList<DataDto> dataDtoList = new ArrayList<>();
// dataDto1
DataDto dataDto1 = new DataDto();
dataDto1.setCreateTime(new Date());
dataDto1.setDeviceCode("001");
dataDto1.setIdentifier("L01");
dataDto1.setPropertyName("co2");
dataDto1.setPropertyValue("500ml");
dataDto1.setNormal(true);
dataDto1.setUnit("ml");
dataDto1.setRegionId(50L);
dataDtoList.add(dataDto1);
// dataDto2
DataDto dataDto2 = new DataDto();
dataDto2.setCreateTime(new Date());
dataDto2.setDeviceCode("002");
dataDto2.setIdentifier("L02");
dataDto2.setPropertyName("co2");
dataDto2.setPropertyValue("500ml");
dataDto2.setNormal(true);
dataDto2.setUnit("ml");
dataDto2.setRegionId(51L);
dataDtoList.add(dataDto2);
// message
String message = JsonUtil.obj2Json(dataDtoList);
dto.setMessage(message);
String pubMsg = JsonUtil.obj2Json(dto);
System.out.println("发送消息:->"+pubMsg);
redisTemplate.convertAndSend("idea", JsonUtil.obj2Json(pubMsg));
return "ok";
}
}
客户端订阅主题
版权声明:本文为qq_16992475原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。