主要有以下四个步骤:
①建立发布者,通过频道(mychannel)发布消息
package com.cqh.PubSub;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* Created by yl1794 on 2018/3/28.
*/
//建立发布者,通过频道(mychannel)发布消息
public class Publisher extends Thread{
private final JedisPool jedisPool;
public Publisher(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
@Override
public void run(){
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
Jedis jedis = jedisPool.getResource(); //连接池中取出一个连接
while (true) {
String line;
try {
line = reader.readLine();
if (!"quit".equals(line)) {
jedis.publish("mychannel", line); //从通过mychannel 频道发布消息
System.out.println(String.format("发布消息成功!channel: %s, message: %s", "mychannel", line));
} else {
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
②建立消息监听类,并重写了JedisPubSub的一些相关方法
package com.cqh.PubSub;
import redis.clients.jedis.JedisPubSub;
/**
* Created by yl1794 on 2018/3/28.
*/
//建立消息监听类,并重写了JedisPubSub的一些相关方法
public class MsgListener extends JedisPubSub{
public MsgListener(){}
@Override
public void onMessage(String channel, String message) { //收到消息会调用
System.out.println(String.format("收到消息成功! channel: %s, message: %s", channel, message));
this.unsubscribe();
}
@Override
public void onSubscribe(String channel, int subscribedChannels) { //订阅频道会调用
System.out.println(String.format("订阅频道成功! channel: %s, subscribedChannels %d",
channel, subscribedChannels));
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) { //取消订阅会调用
System.out.println(String.format("取消订阅频道! channel: %s, subscribedChannels: %d",
channel, subscribedChannels));
}
}
③建立订阅者,订阅者去订阅频道(
mychannel
)
package com.cqh.PubSub;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* Created by yl1794 on 2018/3/28.
*/
//建立订阅者,订阅者去订阅频道(mychannel)
public class Subscriber extends Thread {
private final JedisPool jedisPool;
private final MsgListener msgListener = new MsgListener();
private final String channel = "mychannel";
public Subscriber(JedisPool jedisPool) {
super("Subscriber");
this.jedisPool = jedisPool;
}
@Override
public void run() {
Jedis jedis = null;
try {
jedis = jedisPool.getResource(); //取出一个连接
jedis.subscribe(msgListener, channel); //通过subscribe的api去订阅,参数是订阅者和频道名
//注意:subscribe是一个阻塞的方法,在取消订阅该频道前,会一直阻塞在这,无法执行后续的代码
//这里在msgListener的onMessage方法里面收到消息后,调用了this.unsubscribe();来取消订阅,才会继续执行
System.out.println("继续执行后续代码。。。");
} catch (Exception e) {
System.out.println(String.format("subsrcibe channel error, %s", e));
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
④测试类
package com.cqh.PubSub;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* Created by yl1794 on 2018/3/28.
*/
//测试类,键盘输入消息
public class TestPubSub {
public static void main( String[] args )
{
// 连接redis服务端
JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379);
Publisher publisher = new Publisher(jedisPool); //发布者
publisher.start();
Subscriber subscriber = new Subscriber(jedisPool); //订阅者
subscriber.start();
}
}
⑤结果
版权声明:本文为Mr_EvanChen原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。