redis发布订阅

  • Post author:
  • Post category:其他




redis.conf

############################# EVENT NOTIFICATION ##############################

# Redis能够将在keyspace中发生的事件通知给 发布/订阅 客户端

# Redis can notify Pub/Sub clients about events happening in the key space. 
# This feature is documented at http://redis.io/topics/notifications

# 例如:如果开启了keyspace事件通知,一个客户端在数据库0对一个叫'foo'的key执行了删除操作,
#      那么redis将会通过 发布订阅 机制发布2条消息 
#        PUBLISH __keyspace@0__:foo del 
#        PUBLISH __keyevent@0__:del foo

# For instance if keyspace events notification is enabled, and a client
# performs a DEL operation on key "foo" stored in the Database 0, two
# messages will be published via Pub/Sub:
#
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo

#  也可以指定一组 类名 来选择 Redis 会通知的一类事件。
#  每类事件 都通过一个字符定义

# It is possible to select the events that Redis will notify among a set
# of classes. Every class is identified by a single character:

#        keySpace事件   以 __keyspace@<数据库序号>__ 为前缀 发布事件
#  K     Keyspace events, published with __keyspace@<db>__ prefix.  

#        Keyevent事件   以 __keyevent@<数据库序号>__ 为前缀 发布事件
#  E     Keyevent events, published with __keyevent@<db>__ prefix.        

#        执行常规命令,比如del、expire、rename           
#  g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...      

#        执行 String 命令
#  $     String commands  

#        执行 List   命令                                                          
#  l     List commands  

#        执行 Set    命令                                                         
#  s     Set commands  

#        执行 Hash   命令                                                         
#  h     Hash commands                                                          执行 Hash   命令

#        执行 ZSet   命令
#  z     Sorted set commands 

#        key过期事件(每个key失效都会触发这类事件)                                                   
#  x     Expired events (events generated every time a key expires) 
   
#        key驱逐事件(当key在内存满了被清除时生成)
#  e     Evicted events (events generated when a key is evicted for maxmemory)  

#        A是g$lshzxe的别名,因此AKE就意味着所有的时间
#  A     Alias for g$lshzxe, so that the "AKE" string means all the events.     
#

#  配置中的notify-keyspace-events这个参数由0个或多个字符组成,
#  如果配置为空字符串表示禁用通知
#  The "notify-keyspace-events" takes as argument a string that is composed     
#  of zero or multiple characters. The empty string means that notifications
#  are disabled.
#

#  比如,要开启list命令和generic常规命令的事件通知,
#  应该配置成 notify-keyspace-events Elg
#  Example: to enable list and generic events, from the point of view of the    
#           event name, use:
#
#  notify-keyspace-events Elg
#
#  比如,订阅了__keyevent@0__:expired频道的客户端要收到key失效的时间,
#  应该配置成 notify-keyspace-events Ex
#  Example 2: to get the stream of the expired keys subscribing to channel   name __keyevent@0__:expired use:
#
#  notify-keyspace-events Ex
#

#  默认情况下,所有的通知都被禁用了,并且这个特性有性能上的开销。
#  注意,K和E必须至少指定其中一个,否则,将收不到任何事件。
#  By default all notifications are disabled because most users don't need      
#  this feature and the feature has some overhead. Note that if you don't       
#  specify at least one of K or E, no events will be delivered.
notify-keyspace-events "Ex"

############################### ADVANCED CONFIG ###############################



pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.12.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>



application.yml

spring:
  redis:
    database: 0
    host: localhost
    port: 6379
    timeout: 6000 #连接超长时长(毫秒)
    jedis:
      pool:
        max-active: 1000  #连接池最大连接数(使用负值表示没有限制)
        max-wait: -1ms    #连接池最大阻塞等待时间(使用负值表示没有限制)
        max-idle: 10      #连接池中最大空闲连接
        min-idle: 5       #连接池中最小空闲连接



配置



RedisConfig

@Configuration
public class RedisConfig {
    @Autowired
    private RedisConnectionFactory factory;

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setConnectionFactory(this.factory);
        return redisTemplate;
    }

}



RedisMessageListener

@Configuration
public class RedisMessageListener {
  
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // 监听idea通道
        container.addMessageListener(listenerAdapter, new PatternTopic("idea"));

        return container;
    }

    @Bean
    public MessageListenerAdapter listenerAdapter(ReceiverRedisMessage receiver) {
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(receiver,"receiveMessage");
        return listenerAdapter;
    }

}



ReceiverRedisMessage

@Component
public class ReceiverRedisMessage {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**

     * 见:MessageListenerAdapter$MostSpecificMethodFilter#matches
     * jsonMsg:收到的消息
     * topic:  收到消息的channel
     * 
     * @param jsonMsg
     * @param topic
     */
    public void receiveMessage(String jsonMsg,String topic) {
        System.out.println("监听消息:->" + jsonMsg);

        try {

            //ObjectMapper mapper = new ObjectMapper();
            //Map map = mapper.readValue(jsonMsg, Map.class);

            // 以下可以处理json字符串多斜杠的问题
            MonitorData dto = JSON.parseObject(JSON.parse(jsonMsg).toString(), MonitorData.class);
        } catch (Exception e) {
            System.out.println("失败:"+ jsonMsg);
        }
    }

}



JsonUtil

public abstract class JsonUtil {
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public JsonUtil() {
    }

    public static final Map<String, Object> json2Map(String json) {
        if (json == null) {
            return null;
        } else {
            try {
                return (Map)objectMapper.readValue(json, Map.class);
            } catch (Exception var2) {
                throw new RuntimeException(var2);
            }
        }
    }

    public static String map2Json(Map<String, Object> map) {
        return obj2Json(map);
    }

    public static final <T> T json2Obj(String content, Class<T> clazz) {
        if (StringUtils.isBlank(content)) {
            return null;
        } else {
            try {
                return objectMapper.readValue(content, clazz);
            } catch (Exception var3) {
                throw new RuntimeException(var3);
            }
        }
    }

    public static String obj2Json(Object obj) {
        if (obj == null) {
            return null;
        } else {
            try {
                return objectMapper.writeValueAsString(obj);
            } catch (JsonProcessingException var2) {
                throw new RuntimeException(var2);
            }
        }
    }

    public static <T> T fromJson(String jsonString, JavaType javaType) {
        try {
            return objectMapper.readValue(jsonString, javaType);
        } catch (Exception var3) {
            throw new RuntimeException(var3);
        }
    }

    static {
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        objectMapper.configure(Feature.ALLOW_SINGLE_QUOTES, true);
        objectMapper.configure(Feature.ALLOW_NUMERIC_LEADING_ZEROS, true);
    }
}



RedisController

@RestController
public class RedisController {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @RequestMapping("pub")
    public String pub() {

        MonitorData dto = new MonitorData();
        dto.setRegionId(100L);

        ArrayList<DataDto> dataDtoList = new ArrayList<>();

        // dataDto1
        DataDto dataDto1 = new DataDto();

        dataDto1.setCreateTime(new Date());
        dataDto1.setDeviceCode("001");
        dataDto1.setIdentifier("L01");
        dataDto1.setPropertyName("co2");
        dataDto1.setPropertyValue("500ml");
        dataDto1.setNormal(true);
        dataDto1.setUnit("ml");
        dataDto1.setRegionId(50L);

        dataDtoList.add(dataDto1);

        // dataDto2
        DataDto dataDto2 = new DataDto();
        dataDto2.setCreateTime(new Date());
        dataDto2.setDeviceCode("002");
        dataDto2.setIdentifier("L02");
        dataDto2.setPropertyName("co2");
        dataDto2.setPropertyValue("500ml");
        dataDto2.setNormal(true);
        dataDto2.setUnit("ml");
        dataDto2.setRegionId(51L);
        dataDtoList.add(dataDto2);

        // message
        String message = JsonUtil.obj2Json(dataDtoList);
        dto.setMessage(message);

        String pubMsg = JsonUtil.obj2Json(dto);
        System.out.println("发送消息:->"+pubMsg);
        redisTemplate.convertAndSend("idea", JsonUtil.obj2Json(pubMsg));

        return "ok";
    }

}



客户端订阅主题

在这里插入图片描述



版权声明:本文为qq_16992475原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。