消息服务框架ZBus介绍及入门
轻量级服务总线,面向高性能、低时延、高可用特性调优,支持RPC,消息队列服务。
快速入门
创建消息Server
- 添加pom
<dependency>
<groupId>io.zbus</groupId>
<artifactId>zbus</artifactId>
<version>0.11.5</version>
</dependency>
- 添加配置文件
conf/zbus.xml
<?xml version="1.0" encoding="UTF-8"?>
<zbus> <!-- public/private/monitor address configuration -->
<!-- address open to public, with ssl and auth secured if configured -->
<public>
<address>0.0.0.0:15555</address> <!-- empty to disable -->
<sslEnabled certFile="ssl/zbus.crt" keyFile="ssl/zbus.key">false</sslEnabled>
<!-- use embedded auth
<auth>
<add>
<apiKey>2ba912a8-4a8d-49d2-1a22-198fd285cb06</apiKey>
<secretKey>461277322-943d-4b2f-b9b6-3f860d746ffd</secretKey>
</add>
<add>
<apiKey>3ba912a6-4a8d-49d1-1a66-198ea285cb03</apiKey>
<secretKey>123475622-953d-4b2f-a7b6-4f860d126cce</secretKey>
</add>
</auth>
-->
</public>
<!-- address with no ssl and auth, only for private, speed and convenient
<private>
<address>0.0.0.0:15555</address>
</private>
-->
<!-- address for monitor -->
<monitor>
<address>0.0.0.0:25555</address>
<!--
<auth>
<add>
<apiKey>2ba912a8-4a8d-49d2-1a22-198fd285cb06</apiKey>
<secretKey>461277322-943d-4b2f-b9b6-3f860d746ffd</secretKey>
</add>
<add>
<apiKey>3ba912a6-4a8d-49d1-1a66-198ea285cb03</apiKey>
<secretKey>123475622-953d-4b2f-a7b6-4f860d126cce</secretKey>
</add>
</auth>
-->
</monitor>
<cors>
<origin>*</origin>
<allowedRequestMethods>GET,POST,PUT,OPTIONS</allowedRequestMethods>
<allowedRequestHeaders>Origin, X-Request-With, X-Requested-With, Content-Type, Accept, Token</allowedRequestHeaders>
<exposeHeaders></exposeHeaders>
</cors>
<!-- HTTP 代理
<httpProxy>
<proxy>
<urlPrefix>/test</urlPrefix>
<urlRewrite>/</urlRewrite>
<backend>http://localhost:15555</backend>
</proxy>
</httpProxy>
-->
<staticFileDir cached="false">../static</staticFileDir>
<mqDiskDir>/tmp/zbus</mqDiskDir>
<mqDbUrl></mqDbUrl>
<maxSocketCount>102400</maxSocketCount>
<packageSizeLimit>128M</packageSizeLimit>
<verbose>false</verbose>
</zbus>
- 新增访问页面,
static/index.html
<!DOCTYPE html>
<html>
<head>
<title>Welcome to zbus!</title>
<style>
body {
width: 64em;
margin: 0 auto;
font-family: 20pt Times New Roman;
}
</style>
</head>
<body>
<h1>Welcome to zbus!</h1>
<p>If you see this page, the zbus server is successfully started and working for MQ and RPC</p>
<p>Config location: conf/zbus.xml</p>
<p><em>Thank you for using <a href="http://zbus.io" target="_blank" >zbus</a>.</em></p>
</body>
</html>
- 启动服务器
public class ZbusServer {
public static void main(String[] args) throws Exception {
args = new String[2];
args[0] = "-conf";
args[1] = ZbusServer.class.getClassLoader().getResource("").getPath() + "conf/zbus.xml";
// System.out.println(Arrays.toString(args));
MqServer.main(args);
}
}
- 访问页面,http://localhost:15555/,可以看到server服务页面。
创建消息的消费者
public class MessageConsumer {
public static void main(String[] args) throws IOException {
Broker broker = new Broker("localhost:15555");
ConsumerConfig config = new ConsumerConfig(broker);
config.setTopic("MyTopic"); //指定消息队列主题,同时可以指定分组通道
config.setMessageHandler(new MessageHandler() {
@Override
public void handle(Message msg, MqClient client) throws IOException {
System.out.println(msg); //消费处理
}
});
Consumer consumer = new Consumer(config);
consumer.start();
}
}
创建消息的生产者
public class MessageProducer {
public static void main(String[] args) throws Exception {
//Broker是对zbus服务器的本地抽象,多地址支持HA
Broker broker = new Broker("localhost:15555");
Producer p = new Producer(broker);
p.declareTopic("MyTopic"); //当确定队列不存在需创建
Message msg = new Message();
msg.setTopic("MyTopic"); //设置消息主题
//msg.setTag("oo.account.pp"); //可以设置消息标签
msg.setBody("hello " + System.currentTimeMillis());
Message res = p.publish(msg);
System.out.println(res);
broker.close();
}
}
项目地址
- https://gitee.com/openforce/zbus
版权声明:本文为qq_42985872原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。