为了让结构更清晰一点,本节将分成几次内容提交。
统一Socket与Netty传输协议,同时让Socket使用序列化器
commit地址:
bdb7dc4
还记得在之前的Socket传输中,我们并未引入传输协议,只是单纯传输了一个RpcRequest对象给服务端,直接使用Java原生的readObject、writeObject就可以完成传输,不需要单独的序列化器,现在我们希望让Socket与Netty使用的传输协议保持一致,这意味着Socket也要用到序列化器,通过对代码的重构,实现了在TestServer和TestClient中就能对序列化器进行配置。核心代码如下,与Netty编解码类似:
/**
* @description Socket方式从输入流中读取字节并反序列化【解码】
*/
public class ObjectReader {
private static final Logger logger = LoggerFactory.getLogger(ObjectReader.class);
private static final int MAGIC_NUMBER = 0xCAFEBABE;
public static Object readObject(InputStream in) throws IOException {
byte[] numberBytes = new byte[4];
in.read(numberBytes);
int magic = bytesToInt(numberBytes);
if (magic != MAGIC_NUMBER) {
logger.error("不识别的协议包:{}", magic);
throw new RpcException(RpcError.UNKNOWN_PROTOCOL);
}
in.read(numberBytes);
int packageCode = bytesToInt(numberBytes);
Class<?> packageClass;
if (packageCode == PackageType.REQUEST_PACK.getCode()) {
packageClass = RpcRequest.class;
} else if (packageCode == PackageType.RESPONSE_PACK.getCode()) {
packageClass = RpcResponse.class;
} else {
logger.error("不识别的数据包:{}", packageCode);
throw new RpcException(RpcError.UNKNOWN_PACKAGE_TYPE);
}
in.read(numberBytes);
int serializerCode = bytesToInt(numberBytes);
CommonSerializer serializer = CommonSerializer.getByCode(serializerCode);
if (serializer == null) {
logger.error("不识别的反序列化器:{}", serializerCode);
throw new RpcException(RpcError.UNKNOWN_SERIALIZER);
}
in.read(numberBytes);
int length = bytesToInt(numberBytes);
byte[] bytes = new byte[length];
in.read(bytes);
return serializer.deserialize(bytes, packageClass);
}
/**
* @description 字节数组转换为Int
* @param [src]
* @return [int]
* @date [2021-03-10 21:57]
*/
private static int bytesToInt(byte[] src) {
int value;
value = (src[0] & 0xFF)
| ((src[1] & 0xFF)<<8)
| ((src[2] & 0xFF)<<16)
| ((src[3] & 0xFF)<<24);
return value;
}
}
/**
* @description Socket方式将数据序列化并写入输出流中【编码】
*/
public class ObjectWriter {
private static final Logger logger = LoggerFactory.getLogger(ObjectWriter.class);
private static final int MAGIC_NUMBER = 0xCAFEBABE;
public static void writeObject(OutputStream out, Object object, CommonSerializer serializer) throws IOException {
out.write(intToBytes(MAGIC_NUMBER));
if(object instanceof RpcRequest) {
out.write(intToBytes(PackageType.REQUEST_PACK.getCode()));
} else {
out.write(intToBytes(PackageType.RESPONSE_PACK.getCode()));
}
out.write(intToBytes(serializer.getCode()));
byte[] bytes = serializer.serialize(object);
out.write(intToBytes(bytes.length));
out.write(bytes);
out.flush();
}
/**
* @description 将Int转换为字节数组
* @param [value]
* @return [byte[]]
* @date [2021-03-10 22:15]
*/
private static byte[] intToBytes(int value) {
byte[] des = new byte[4];
des[3] = (byte) ((value>>24) & 0xFF);
des[2] = (byte) ((value>>16) & 0xFF);
des[1] = (byte) ((value>>8) & 0xFF);
des[0] = (byte) (value & 0xFF);
return des;
}
}
为请求加上请求号,保证请求与响应一一对应
commit地址:
09a3aaa
利用请求号对服务端返回的响应数据进行校验,保证请求与响应一一对应,同时对服务端的相关处理代码进行了结构优化,核心代码如下:
/**
* @description 检查响应和请求
*/
public class RpcMessageChecker {
private static final String INTERFACE_NAME = "interfaceName";
private static final Logger logger = LoggerFactory.getLogger(RpcMessageChecker.class);
private RpcMessageChecker(){
}
public static void check(RpcRequest rpcRequest, RpcResponse rpcResponse){
if(rpcResponse == null) {
logger.error("调用服务失败,serviceName:{}", rpcRequest.getInterfaceName());
throw new RpcException(RpcError.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
//响应与请求的请求号不同
if(!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) {
throw new RpcException(RpcError.RESPONSE_NOT_MATCH, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
//调用失败
if(rpcResponse.getStatusCode() == null || !rpcResponse.getStatusCode().equals(ResponseCode.SUCCESS.getCode())){
logger.error("调用服务失败,serviceName:{},RpcResponse:{}", rpcRequest.getInterfaceName(), rpcResponse);
throw new RpcException(RpcError.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
}
}
实现Netty客户端连接失败重试机制
commit地址:
9bb0ea7
(github访问速度太慢了,所以同步到gitee了)
将客户端Channel的连接创建分离出来,给其设定5次失败后重连的机会;同时改变Socket方式数据传输的端序,使其与Netty保持一致,具体来说比如137这个int值,是按从1到3再到7的顺序序列化,还是从7到3再到1的顺序转换,核心代码如下:
/**
* @description 用于获取Channel对象
*/
public class ChannelProvider {
private static final Logger logger = LoggerFactory.getLogger(ChannelProvider.class);
private static EventLoopGroup eventLoopGroup;
private static Bootstrap bootstrap = initializeBootstrap();
private static final int MAX_RETRY_COUNT = 5;
private static Channel channel = null;
private static Bootstrap initializeBootstrap() {
eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
//连接的超时时间,超过这个时间还是建立不上的话则代表连接失败
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
//启用该功能时,TCP会主动探测空闲连接的有效性。可以将此功能视为TCP的心跳机制,默认的心跳间隔是7200s即2小时。
.option(ChannelOption.SO_KEEPALIVE, true)
//配置Channel参数,nodelay没有延迟,true就代表禁用Nagle算法,减小传输延迟。
//理解可参考:https://blog.csdn.net/lclwjl/article/details/80154565
.option(ChannelOption.TCP_NODELAY, true);
return bootstrap;
}
public static Channel get(InetSocketAddress inetSocketAddress, CommonSerializer serializer){
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch){
ch.pipeline().addLast(new CommonEncoder(serializer))
.addLast(new CommonDecoder())
.addLast(new NettyClientHandler());
}
});
//设置计数器值为1
CountDownLatch countDownLatch = new CountDownLatch(1);
try{
connect(bootstrap, inetSocketAddress, countDownLatch);
//阻塞当前线程直到计时器的值为0
countDownLatch.await();
}catch (InterruptedException e){
logger.error("获取Channel时有错误发生", e);
}
return channel;
}
private static void connect(Bootstrap bootstrap, InetSocketAddress inetSocketAddress, CountDownLatch countDownLatch) {
connect(bootstrap, inetSocketAddress, MAX_RETRY_COUNT, countDownLatch);
}
/**
* @description Netty客户端创建通道连接,实现连接失败重试机制
* @param [bootstrap, inetSocketAddress, retry, countDownLatch]
* @return [void]
* @date [2021-03-11 14:19]
*/
private static void connect(Bootstrap bootstrap, InetSocketAddress inetSocketAddress, int retry, CountDownLatch countDownLatch) {
bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
logger.info("客户端连接成功!");
channel = future.channel();
//计数器减一
countDownLatch.countDown();
return;
}
if (retry == 0) {
logger.error("客户端连接失败:重试次数已用完,放弃连接!");
countDownLatch.countDown();
throw new RpcException(RpcError.CLIENT_CONNECT_SERVER_FAILURE);
}
//第几次重连
int order = (MAX_RETRY_COUNT - retry) + 1;
//重连的时间间隔,相当于1乘以2的order次方
int delay = 1 << order;
logger.error("{}:连接失败,第{}次重连……", new Date(), order);
//利用schedule()在给定的延迟时间后执行connect()重连
bootstrap.config().group().schedule(() -> connect(bootstrap, inetSocketAddress, retry - 1, countDownLatch), delay,
TimeUnit.SECONDS);
});
}
}
Protostuff序列化器
commit地址:
77cba60
和前面的kryo、hessian实现过程类似,代码理解都写注释里了,其次就是让SocketServer使用之前创建好的线程池工具类,核心代码如下:
/**
* @description Protostuff序列化器
*/
public class ProtostuffSerializer implements CommonSerializer {
/**
* 避免每次序列化都重新申请Buffer空间,用来存放对象序列化之后的数据
* 如果你设置的空间不足,会自动扩展的,但这个大小还是要设置一个合适的值,设置大了浪费空间,设置小了会自动扩展浪费时间
*/
private LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
/**
* 缓存类对应的Schema,由于构造schema需要获得对象的类和字段信息,会用到反射机制
*这是一个很耗时的过程,因此进行缓存很有必要,下次遇到相同的类直接从缓存中get就行了
*/
private Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();
@Override
public byte[] serialize(Object obj) {
Class clazz = obj.getClass();
Schema schema = getSchema(clazz);
byte[] data;
try {
//序列化操作,将对象转换为字节数组
data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} finally {
//使用完清空buffer
buffer.clear();
}
return data;
}
@Override
public Object deserialize(byte[] bytes, Class<?> clazz) {
Schema schema = getSchema(clazz);
Object obj = schema.newMessage();
//反序列化操作,将字节数组转换为对应的对象
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
@Override
public int getCode() {
return SerializerCode.valueOf("PROTOBUF").getCode();
}
/**
* @description 获取Schema
* @param [clazz]
* @return [io.protostuff.Schema]
* @date [2021-03-11 21:38]
*/
private Schema getSchema(Class clazz) {
//首先尝试从Map缓存中获取类对应的schema
Schema schema = schemaCache.get(clazz);
if(Objects.isNull(schema)) {
//新创建一个schema,RuntimeSchema就是将schema繁琐的创建过程封装了起来
//它的创建过程是线程安全的,采用懒创建的方式,即当需要schema的时候才创建
schema = RuntimeSchema.getSchema(clazz);
if(Objects.nonNull(schema)) {
//缓存schema,方便下次直接使用
schemaCache.put(clazz, schema);
}
}
return schema;
}
}
项目v2.0到此结束,准备升级到v3.0
commit地址:
f156d47
本节over……