ZBus实现RPC的示例和原理
示例
服务端
public class Zbus {
public static void main(String[] args) {
MqServer.main(args);
}
}
RpcServer
@Filter("login")
public class RpcServer {
public int plus(int a, int b) {
return a+b;
}
@Filter(exclude = "login")
public Map<String, Object> p(@Param("name") String name, @Param("age")int age) {
Map<String, Object> value = new HashMap<>();
value.put("name", name);
value.put("age", age);
value.put("nullKey", null);
System.out.println(name);
return value;
}
public Map<String, Object> map(Map<String, Object> table) {
System.out.println(table);
return table;
}
@Route("/abc") //default path could be changed
public Object json() {
Map<String, Object> value = new HashMap<>();
value.put("key1", System.currentTimeMillis());
value.put("key2", System.currentTimeMillis());
return value;
}
@Route("/") //default path could be changed
public Message home(Message req) {
System.out.println(req);
Message res = new Message();
res.setHeader("content-type", "text/html; charset=utf8");
res.setBody("<h1>test body</h1>");
return res;
}
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception {
RpcProcessor p = new RpcProcessor();
p.mount("/", RpcServerSimpleExample.class);
//p.setBeforeFilter(new MyFilter());
p.setBeforeFilter(new RpcFilter() {
@Override
public boolean doFilter(Message request, Message response, Throwable exception) {
Map<String, Object> ctx = new HashMap<>();
ctx.put("key", "set in before filter");
request.setContext(ctx );
return true;
}
});
p.setAfterFilter(new RpcFilter() {
@Override
public boolean doFilter(Message request, Message response, Throwable exception) {
Object ctx = request.getContext();
System.out.println("In After Filter>>>>>" + ctx);
return true;
}
});
RpcServer rpcServer = new RpcServer();
rpcServer.setRpcProcessor(p);
//rpcServer.setChannel("temp");
//rpcServer.setRouteDisabled(true);
rpcServer.setMqServerAddress("localhost:15555");
rpcServer.setMq("/");
//rpcServer.setMqServer(new MqServer(15555));
rpcServer.start();
}
}
RpcClient
public class RpcClientSimpleExample {
public static void main(String[] args) throws Exception {
RpcClient rpc = new RpcClient("localhost:15555");
Message req = new Message();
req.setUrl("/map");
Map<String, Object> map = new HashMap<>();
map.put("key", "value");
map.put("nullkey", null);
req.setBody(new Object[] {map}); //body as parameter array
Message res = rpc.invoke(req); //同步调用
System.out.println(res);
rpc.close();
}
}
实现原理
RpcServer启动
-
RpcProcessor#mount()
该方法会加载类上的Route信息,加载到
RpcProcessor#urlPath2MethodTable
。
this.rpcProcessor.mountDoc();
会加载DocRender上面的路由信息,
DocRender#index
。 -
启动client。
RpcServer#startClient
,判断当前client为null,直接新建MqClient。设置client的mqHandler用来接收消息;向服务端新建MQ,订阅MQ的消息。
RpcServer处理消息
-
找到需要处理的方法,
SimpleChannelInboundHandler#channelRead0-->MqClient#handleMessage-->RpcProcessor#invoke``RpcProcessor#findMethodByUrl
,进行反射。
服务端转发消息
- 服务端接收到RpcClient的消息,转发给RpcServer
- 服务端接收到RpcServer的消息,转发给RpcClient。
-
核心方法,
RouteHandler#handle
根据sessionTable来获取target。 -
MqServerAdaptor#attachInfo
会往request里面添加headers属性,source,remote-addr。 -
UrlRouteFilter#doFilter
,判断如果当前的cmd为null,直接是设置cmd为pub,且ack=fasle。
# RpcClient发送消息
{
"headers":{
"id":"1a5c8934-e8d5-41bb-8d79-4eea9555fa35"
},
"body":[
{
"key":"value"
}
],
"url":"/map"
}
# 服务端转发消息
{
"headers":{
"ack":false,
"id":"1a5c8934-e8d5-41bb-8d79-4eea9555fa35",
"mq":"/",
"remote-addr":"/127.0.0.1:61833",
"source":"40e81993-23b7-46a1-a6d6-b285fd8c825e"
},
"body":[
{
"key":"value"
}
],
"url":"/map"
}
# RpcServer回复消息
{
"headers":{
"Content-Type":"application/json; charset=utf8",
"id":"f3ae2d95-5136-4884-b363-170d8cedd91b",
"remote-addr":"/127.0.0.1:62973",
"source":"c78afc85-9534-4008-b6ba-7653c685e8ae",
"target":"059491fe-de75-4a09-8b30-223453a80500"
},
"body":{
"key":"value"
},
"status":200
}
# 服务端转发消息
{
"headers":{
"Content-Type":"application/json; charset=utf8",
"id":"1a5c8934-e8d5-41bb-8d79-4eea9555fa35",
"remote-addr":"/127.0.0.1:59245"
},
"body":{
"key":"value"
},
"status":200
}
总结思考
- RpcServer启动的时候,就发送sub请求。当RpcClient发送请求到服务端(类似于注册中心),会转成pub请求。pub请求会遍历queue里面的channel来接收。
-
RpcServer
收到消息后,根据url找到对应的方法,将执行后的结果返回给服务端。服务端会调用RouteHandler来根据target来转发请求给RpcClient。
版权声明:本文为qq_42985872原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。