【Redis】Java实现redis消息订阅/发布(PubSub)

  • Post author:
  • Post category:java



主要有以下四个步骤:


①建立发布者,通过频道(mychannel)发布消息

package com.cqh.PubSub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

/**
 * Created by yl1794 on 2018/3/28.
 */
//建立发布者,通过频道(mychannel)发布消息
public class Publisher extends Thread{
    private final JedisPool jedisPool;

    public Publisher(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    @Override
    public void run(){
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        Jedis jedis = jedisPool.getResource();   //连接池中取出一个连接
        while (true) {
            String line;
            try {
                line = reader.readLine();
                if (!"quit".equals(line)) {
                    jedis.publish("mychannel", line);   //从通过mychannel 频道发布消息
                    System.out.println(String.format("发布消息成功!channel: %s, message: %s", "mychannel", line));
                } else {
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

②建立消息监听类,并重写了JedisPubSub的一些相关方法

package com.cqh.PubSub;

import redis.clients.jedis.JedisPubSub;

/**
 * Created by yl1794 on 2018/3/28.
 */
//建立消息监听类,并重写了JedisPubSub的一些相关方法
public class MsgListener extends JedisPubSub{

    public MsgListener(){}

    @Override
    public void onMessage(String channel, String message) {       //收到消息会调用
        System.out.println(String.format("收到消息成功! channel: %s, message: %s", channel, message));
        this.unsubscribe();
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {    //订阅频道会调用
        System.out.println(String.format("订阅频道成功! channel: %s, subscribedChannels %d",
        channel, subscribedChannels));
    }

    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {   //取消订阅会调用
        System.out.println(String.format("取消订阅频道! channel: %s, subscribedChannels: %d",
                channel, subscribedChannels));

    }
}

③建立订阅者,订阅者去订阅频道(

mychannel

package com.cqh.PubSub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * Created by yl1794 on 2018/3/28.
 */
//建立订阅者,订阅者去订阅频道(mychannel)
public class Subscriber extends Thread {
    private final JedisPool jedisPool;
    private final MsgListener msgListener = new MsgListener();

    private final String channel = "mychannel";

    public Subscriber(JedisPool jedisPool) {
        super("Subscriber");
        this.jedisPool = jedisPool;
    }

    @Override
    public void run() {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();   //取出一个连接
            jedis.subscribe(msgListener, channel);    //通过subscribe的api去订阅,参数是订阅者和频道名

            //注意:subscribe是一个阻塞的方法,在取消订阅该频道前,会一直阻塞在这,无法执行后续的代码
            //这里在msgListener的onMessage方法里面收到消息后,调用了this.unsubscribe();来取消订阅,才会继续执行
            System.out.println("继续执行后续代码。。。");

        } catch (Exception e) {
            System.out.println(String.format("subsrcibe channel error, %s", e));
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}

④测试类

package com.cqh.PubSub;

import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * Created by yl1794 on 2018/3/28.
 */
//测试类,键盘输入消息
public class TestPubSub {
    public static void main( String[] args )
    {
        // 连接redis服务端
        JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379);

        Publisher publisher = new Publisher(jedisPool);    //发布者
        publisher.start();

        Subscriber subscriber = new Subscriber(jedisPool);    //订阅者
        subscriber.start();


    }
}

⑤结果




版权声明:本文为Mr_EvanChen原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。