手写RPC框架项目代码解读——Netty版本

  • Post author:
  • Post category:其他


项目地址:

https://github.com/Snailclimb/guide-rpc-framework



Socket版本的手写RPC参考:

手写RPC框架项目代码解读——Socket版本



1、基于Netty

使用注解开发,而不是xml配置文件,好处是便捷性,操作性。

@Builder主要作用是用来生成对象,并能够进行链式赋值。

@RpcService放在服务实现类上。

@RpcReference放在服务引用上。

服务端发送的数据类型是:RpcMessage<RpcReponse<Object>>。其中Object是为了包含服务的各种返回类型。

客户端发送的数据类型是:RpcMessage<RpcRequest<Object>>。

心跳机制:客户端发送信息类型为心跳请求的信息,数据为PING,服务端发送信息类型为心跳响应的信息,数据为PONG。




1.1 Spring类

在Netty版本,定义了一些Spring继承类,包括CustomScanner,CustomScannerRegistrar,SpringBeanPostProcessor。

CustomScanner类,

public class CustomScanner extends ClassPathBeanDefinitionScanner {

    public CustomScanner(BeanDefinitionRegistry registry, Class<? extends Annotation> annoType) {
        super(registry);
        super.addIncludeFilter(new AnnotationTypeFilter(annoType));
    }

    @Override
    public int scan(String... basePackages) {
        return super.scan(basePackages);
    }
}

postProcessBeforeInitialization方法在每个类注入容器的类被实例化前都会被执行。首先获得加载类的Class,判断类上是否有RpcService注解,这个注解是放在服务实现类上的。获得注解上的组号和版本号,设置RpcServiceConfig的组号,版本号,类。最后在服务提供方调用publishService方法发布服务。

postProcessAfterInitialization方法在每个需要注入容器的类被实例化后都会被执行。首先获取加载类的Class,再根据类获得声明的属性,可能有多个属性,对每个属性进行遍历。再获得属性上的RpcReference注解,如果没有这个注解,则跳过;如果有RpcReference注解,则将RpcServiceConfig和rpcClient生成代理类,再调用getProxy方法获得代理对象,并设置这个属性是共有的。最后设置bean的属性为代理对象。

@Slf4j
@Component
public class SpringBeanPostProcessor implements BeanPostProcessor {

    private final ServiceProvider serviceProvider;
    private final RpcRequestTransport rpcClient;

    public SpringBeanPostProcessor() {
        this.serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);
        this.rpcClient = ExtensionLoader.getExtensionLoader(RpcRequestTransport.class).getExtension("netty");
    }

    @SneakyThrows
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if (bean.getClass().isAnnotationPresent(RpcService.class)) {
            log.info("[{}] is annotated with  [{}]", bean.getClass().getName(), RpcService.class.getCanonicalName());
            // get RpcService annotation
            RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
            // build RpcServiceProperties
            RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
                    .group(rpcService.group())
                    .version(rpcService.version())
                    .service(bean).build();
            serviceProvider.publishService(rpcServiceConfig);
        }
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        Class<?> targetClass = bean.getClass();
        Field[] declaredFields = targetClass.getDeclaredFields();
        for (Field declaredField : declaredFields) {
            RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);
            // System.out.println(rpcReference.group() + " rpcReference ");
            if (rpcReference != null) {
                RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
                        .group(rpcReference.group())
                        .version(rpcReference.version()).build();
                RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceConfig);
                Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());
                declaredField.setAccessible(true);
                try {
                    declaredField.set(bean, clientProxy);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
            }

        }
        return bean;
    }
}

SpringBeanPostProcessor 类在初始化时,会有这样的构造函数,会初始化服务提供实现和Netty客户端。

public SpringBeanPostProcessor() {
    this.serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);
    this.rpcClient = ExtensionLoader.getExtensionLoader(RpcRequestTransport.class).getExtension("netty");
}

在初始化NettyRpcClient时,会调用这个构造函数,参考另外一篇博客

Netty的基本概念

可以知道,这是初始化Netty的客户端。

public NettyRpcClient() {
    // initialize resources such as EventLoopGroup, Bootstrap
    eventLoopGroup = new NioEventLoopGroup();
    bootstrap = new Bootstrap();
    bootstrap.group(eventLoopGroup)
            .channel(NioSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.INFO))
            //  The timeout period of the connection.
            //  If this time is exceeded or the connection cannot be established, the connection fails.
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ChannelPipeline p = ch.pipeline();
                    // If no data is sent to the server within 15 seconds, a heartbeat request is sent
                    p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
                    p.addLast(new RpcMessageEncoder());
                    p.addLast(new RpcMessageDecoder());
                    p.addLast(new NettyRpcClientHandler());
                }
            });
    this.serviceDiscovery = ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension("zk");
    this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
    this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class);
}



1.2 服务端

在下面的示例中,AnnotationConfigApplicationContext基于扫描路径github.javaguide加载Spring的应用上下文,需要加载的类加上注解@Component,服务实现类上加了注解@RpcService。通过容器获得NettyRpcServer 的实例,对于HelloService1通过注解已经被加载到容器中,对于HelloService2可以手动注册到配置类中,再注册到NettyRpcServer中。最后start方法启动服务。

@RpcScan(basePackage = {"github.javaguide"})
public class NettyServerMain {
    public static void main(String[] args) {
        // Register service via annotation
        AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(NettyServerMain.class);
        NettyRpcServer nettyRpcServer = (NettyRpcServer) applicationContext.getBean("nettyRpcServer");
        // Register service manually
        HelloService helloService2 = new HelloServiceImpl2();
        RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
                .group("test2").version("version2").service(helloService2).build();
        nettyRpcServer.registerService(rpcServiceConfig);
        nettyRpcServer.start();
    }
}

需要自动注册的服务类使用自定义注解@RpcService,如下

@Slf4j
@RpcService(group = "test1", version = "version1")
public class HelloServiceImpl implements HelloService {

    static {
        System.out.println("HelloServiceImpl被创建");
    }

    @Override
    public String hello(Hello hello) {
        log.info("HelloServiceImpl收到: {}.", hello.getMessage());
        String result = "Hello description1 is " + hello.getDescription();
        log.info("HelloServiceImpl返回: {}.", result);
        return result;
    }
}

对于手动注册的registerService,会调用serviceProvider的publishService方法。Netty版的手动注册服务和Socket版本的是一样的。可以参考

手写RPC框架项目代码解读——Socket版本

public void registerService(RpcServiceConfig rpcServiceConfig) {
    serviceProvider.publishService(rpcServiceConfig);
}



1.1.1 启动

1、首先设置钩子函数,这里和Socket版本是一样的。

2、Netty的实例化,需要启动两个EventLoopGroup,这里使用了主从Reactor多线程模型,服务端指定两个EventLoopGroup,一个是bossGroup,处理客户端的连接请求,另一个是workerGroup,用于处理与各个客户端连接的I/O操作。

3、指定Channel的类型,这里是服务端,所以使用NioServerSocketChannel,用于异步非阻塞的服务端TCP Socket连接。Channel是对Java底层Socket连接的抽象。

4、配置自定义的业务处理器Handler。IdleStateHandler是心跳检测,指定编码器RpcMessageEncoder,解码器RpcMessageDecoder,NettyRpcServerHandler中实现RPC服务具体逻辑。

nettyRpcServer.start();

@SneakyThrows
public void start() {
    CustomShutdownHook.getCustomShutdownHook().clearAll();
    String host = InetAddress.getLocalHost().getHostAddress();
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup(
            RuntimeUtil.cpus() * 2,
            ThreadPoolFactoryUtil.createThreadFactory("service-handler-group", false)
    );
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
                .childOption(ChannelOption.TCP_NODELAY, true)
                // 是否开启 TCP 底层心跳机制
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                //表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
                .option(ChannelOption.SO_BACKLOG, 128)
                .handler(new LoggingHandler(LogLevel.INFO))
                // 当客户端第一次进行请求的时候才会进行初始化
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        // 30 秒之内没有收到客户端请求的话就关闭连接
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
                        p.addLast(new RpcMessageEncoder());
                        p.addLast(new RpcMessageDecoder());
                        p.addLast(serviceHandlerGroup, new NettyRpcServerHandler());
                    }
                });

        // 绑定端口,同步等待绑定成功
        ChannelFuture f = b.bind(host, PORT).sync();
        // 等待服务端监听端口关闭
        f.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        log.error("occur exception when start server:", e);
    } finally {
        log.error("shutdown bossGroup and workerGroup");
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        serviceHandlerGroup.shutdownGracefully();
    }
}



1.1.2 业务处理器

channelRead方法负责接收来自客户端的信息并处理。该方法首先判断接收的信息是否是RpcMessage类或子类的实例,如果是,获得数据类型,新建返回的数据rpcMessage,设置编码格式和压缩格式,接下来判断接收的信息类型是否是心跳请求类型:如果是,则在rpcMessage中设置信息类型是心跳机制的响应,并设置数据为常量RpcConstants.PONG,将rpcMessage写入管道并flush;如果不是心跳请求类型,则提取msg中的数据,使用rpcRequestHandler进行处理,这里的处理和Socket版本一致,都是从服务提供处获得具体服务,调用服务的函数,将结果返回。获得返回的数据后,设置信息类型为RESPONSE_TYPE。如果ctx管道是可写的,则产生成功数据,并在返回信息中设置数据,如果不是可写的,产生失败数据,设置到返回信息中,最后都需要写入ctx并刷新,设置监听器。此外,由于继承的是ChannelInboundHandlerAdapter类,因此finally块需要把Butebuf清除掉,防止内存泄漏。

@Slf4j
public class NettyRpcServerHandler extends ChannelInboundHandlerAdapter {

    private final RpcRequestHandler rpcRequestHandler;

    public NettyRpcServerHandler() {
        this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            if (msg instanceof RpcMessage) {
                log.info("server receive msg: [{}] ", msg);
                byte messageType = ((RpcMessage) msg).getMessageType();
                RpcMessage rpcMessage = new RpcMessage();
                rpcMessage.setCodec(SerializationTypeEnum.HESSIAN.getCode());
                rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
                if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
                    rpcMessage.setMessageType(RpcConstants.HEARTBEAT_RESPONSE_TYPE);
                    rpcMessage.setData(RpcConstants.PONG);
                } else {
                    RpcRequest rpcRequest = (RpcRequest) ((RpcMessage) msg).getData();
                    // Execute the target method (the method the client needs to execute) and return the method result
                    Object result = rpcRequestHandler.handle(rpcRequest);
                    log.info(String.format("server get result: %s", result.toString()));
                    rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
                    if (ctx.channel().isActive() && ctx.channel().isWritable()) {
                        RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId());
                        rpcMessage.setData(rpcResponse);
                    } else {
                        RpcResponse<Object> rpcResponse = RpcResponse.fail(RpcResponseCodeEnum.FAIL);
                        rpcMessage.setData(rpcResponse);
                        log.error("not writable now, message dropped");
                    }
                }
                ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        } finally {
            //Ensure that ByteBuf is released, otherwise there may be memory leaks
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.READER_IDLE) {
                log.info("idle check happen, so close the connection");
                ctx.close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("server catch exception");
        cause.printStackTrace();
        ctx.close();
    }
}

success方法,设置状态码为200,信息为The remote call is successful,请求ID为传入的请求ID,将数据放入响应中。

public static <T> RpcResponse<T> success(T data, String requestId) {
    RpcResponse<T> response = new RpcResponse<>();
    response.setCode(RpcResponseCodeEnum.SUCCESS.getCode());
    response.setMessage(RpcResponseCodeEnum.SUCCESS.getMessage());
    response.setRequestId(requestId);
    if (null != data) {
        response.setData(data);
    }
    return response;
}

fail方法,设置状态码为500,信息为The remote call is fail。

public static <T> RpcResponse<T> fail(RpcResponseCodeEnum rpcResponseCodeEnum) {
    RpcResponse<T> response = new RpcResponse<>();
    response.setCode(rpcResponseCodeEnum.getCode());
    response.setMessage(rpcResponseCodeEnum.getMessage());
    return response;
}



1.3 客户端

首先进行包扫描,将类加载到容器中,再获得HelloController的实例化对象,执行test方法。

public static void main(String[] args) throws InterruptedException {
    AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(NettyClientMain.class);
    HelloController helloController = (HelloController) applicationContext.getBean("helloController");
    helloController.test();
}



1.3.1 test方法

可以看到属性HelloService已经Autowired。参考

Spring类

,在实例化HelloController之后会调用SpringBeanPostProcessor的postProcessAfterInitialization方法,为这个属性生成代理对象,因此调用hello方法时会使用代理对象的invoke方法,见下面代码。invoke方法和Socket版本的invoke方法是同一个方法,区别在于里面有判断语句,如果是NettyRpcClient的对象,会使用不同的sendRpcRequest方法。

@Component
public class HelloController {

    @RpcReference(version = "version2", group = "test2")
    private HelloService helloService;

    public void test() throws InterruptedException {
        // String hello = this.helloService.hello(new Hello("111", "222"));
        // //如需使用 assert 断言,需要在 VM options 添加参数:-ea
        // assert "Hello description is 222".equals(hello);
        // Thread.sleep(12000);
        for (int i = 0; i < 10; i++) {
            System.out.println(helloService.hello(new Hello("111", "222")));
        }
        System.out.println(helloService.hello(new Hello("000", "333")));
    }
}

@SneakyThrows
@SuppressWarnings("unchecked")
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
    log.info("invoked method: [{}]", method.getName());
    RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
            .parameters(args)
            .interfaceName(method.getDeclaringClass().getName())
            .paramTypes(method.getParameterTypes())
            .requestId(UUID.randomUUID().toString())
            .group(rpcServiceConfig.getGroup())
            .version(rpcServiceConfig.getVersion())
            .build();
    RpcResponse<Object> rpcResponse = null;
    if (rpcRequestTransport instanceof NettyRpcClient) {
        CompletableFuture<RpcResponse<Object>> completableFuture = (CompletableFuture<RpcResponse<Object>>) rpcRequestTransport.sendRpcRequest(rpcRequest);
        rpcResponse = completableFuture.get();
    }
    if (rpcRequestTransport instanceof SocketRpcClient) {
        rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);
    }
    this.check(rpcResponse, rpcRequest);
    return rpcResponse.getData();
}



1.3.2 发送请求

这里使用了CompletableFuture,作用是异步执行,当子线程获得结果后,主线程可以通过get方法取出结果,主线程不会被阻塞。

sendRpcRequest方法,首先lookupService获得服务器的地址,再获得管道,如果管道不是active则抛出异常,否则继续,在未处理请求中放入键值对,键是请求ID,值是resultFuture,接着实例化需要发送的RpcMessage,设置数据是rpcRequest,编码方式使用hessian,压缩方式使用gzip,信息类型设置成REQUEST_TYPE,将rpcMessage写到管道中,并设置监听器,发送成功则打印日志;如果发送失败,则关闭发送管道,resultFuture在异常情况下完成任务,并打印错误日志。

@Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
    // build return value
    CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
    // get server address
    InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);
    // get  server address related channel
    Channel channel = getChannel(inetSocketAddress);
    if (channel.isActive()) {
        // put unprocessed request
        unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);
        RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest)
                .codec(SerializationTypeEnum.HESSIAN.getCode())
                .compress(CompressTypeEnum.GZIP.getCode())
                .messageType(RpcConstants.REQUEST_TYPE).build();
        channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                log.info("client send message: [{}]", rpcMessage);
            } else {
                future.channel().close();
                resultFuture.completeExceptionally(future.cause());
                log.error("Send failed:", future.cause());
            }
        });
    } else {
        throw new IllegalStateException();
    }

    return resultFuture;
}



1.3.3 接收响应

客户端之所以能接收响应,是因为在SpringBeanPostProcessor初始化时,会调用无参构造函数初始化NettyRpcClient,而NettyRpcClient初始化时调用无参构造函数会初始化Netty客户端,并设置处理器为NettyRpcClientHandler,

NettyRpcClientHandler的channelRead方法可以读取来自服务端的信息并处理

。channelRead首先判断信息是否是RpcMessage类型或子类,如果是,将其强制转换成RpcMessage类,并且获得信息类型,如果是心跳机制的响应类型,则输出日志即可;如果是响应类型,则获取其中的数据强转为RpcResponse类,并调用unprocessedRequests的complete函数。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        log.info("client receive msg: [{}]", msg);
        if (msg instanceof RpcMessage) {
            RpcMessage tmp = (RpcMessage) msg;
            byte messageType = tmp.getMessageType();
            if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
                log.info("heart [{}]", tmp.getData());
            } else if (messageType == RpcConstants.RESPONSE_TYPE) {
                RpcResponse<Object> rpcResponse = (RpcResponse<Object>) tmp.getData();
                unprocessedRequests.complete(rpcResponse);
            }
        }
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

complete函数,首先将未处理响应map中根据请求ID,将这个任务删去,如果不存在这个任务则抛出异常,如果存在这个任务则调用CompletableFuture的complete函数,传入的rpcResponse会被CompletableFuture的get方法获取到。

public void complete(RpcResponse<Object> rpcResponse) {
    CompletableFuture<RpcResponse<Object>> future = UNPROCESSED_RESPONSE_FUTURES.remove(rpcResponse.getRequestId());
    if (null != future) {
        future.complete(rpcResponse);
    } else {
        throw new IllegalStateException();
    }
}

最后,在invoke方法中,completableFuture.get方法会获取到rpcResponse。



2、序列化

Serializer是序列化的类需要实现的接口。

@SPI
public interface Serializer {
    /**
     * 序列化
     *
     * @param obj 要序列化的对象
     * @return 字节数组
     */
    byte[] serialize(Object obj);

    /**
     * 反序列化
     *
     * @param bytes 序列化后的字节数组
     * @param clazz 目标类
     * @param <T>   类的类型。举个例子,  {@code String.class} 的类型是 {@code Class<String>}.
     *              如果不知道类的类型的话,使用 {@code Class<?>}
     * @return 反序列化的对象
     */
    <T> T deserialize(byte[] bytes, Class<T> clazz);
}



2.1 Kyro

Kyro的序列化效率很高,但缺点是只能用于Java语言。因为Kryo不是线程安全的,因此使用ThreadLocal存储对象。

serialize方法,首先实例化ByteArrayOutputStream,再传入Output进行实例化,kryoThreadLocal获得Kryo对象,调用Kryo对象的writeObject方法,将obj写到output里,再移去Kryo对象,返回output的数组形式。

deserialize方法,首先实例化ByteArrayInputStream,传入byte数组,实例化Input,传入ByteArrayInputStream,获得Kryo对象,调用readObject方法将byte数组写入到obj,再调用cast函数,其实是强制转换,将o对象强转成clazz的类。

@Slf4j
public class KryoSerializer implements Serializer {

    /**
     * Because Kryo is not thread safe. So, use ThreadLocal to store Kryo objects
     */
    private final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
        Kryo kryo = new Kryo();
        kryo.register(RpcResponse.class);
        kryo.register(RpcRequest.class);
        return kryo;
    });

    @Override
    public byte[] serialize(Object obj) {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
             Output output = new Output(byteArrayOutputStream)) {
            Kryo kryo = kryoThreadLocal.get();
            // Object->byte:将对象序列化为byte数组
            kryo.writeObject(output, obj);
            kryoThreadLocal.remove();
            return output.toBytes();
        } catch (Exception e) {
            throw new SerializeException("Serialization failed");
        }
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
             Input input = new Input(byteArrayInputStream)) {
            Kryo kryo = kryoThreadLocal.get();
            // byte->Object:从byte数组中反序列化出对对象
            Object o = kryo.readObject(input, clazz);
            kryoThreadLocal.remove();
            return clazz.cast(o);
        } catch (Exception e) {
            throw new SerializeException("Deserialization failed");
        }
    }

}



2.2 Hessian

这个项目中目前使用Hessian进行序列化和反序列化。Hessian是一个使用二进制Web服务协议的框架。Hessian序列化是一种支持动态类型、跨语言、基于对象传输的网络协议。特点有:

1、自描述序列化类型。不依赖外部描述文件或者接口定义,用一个字节表示常用的基础类型,极大缩短二进制流。

2、语言无关,支持脚本语言。

3、协议简单,比Java原生序列化高效。相比hessian1,hessian2中增加了压缩编码,其序列化二进制流大小是Java序列化的50%,序列化耗时是Java序列化的30%,反序列化耗时是Java序列化的20%。

Hessian会把复杂的对象所有属性存储在一个Map中进行序列化。所以在父类、子类中存在同名成员变量的情况下,hessian序列化时,先序列化子类,然后序列化父类。因此,反序列化结果会导致子类同名成员变量被父类的值覆盖。

public class HessianSerializer implements Serializer {
    @Override
    public byte[] serialize(Object obj) {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
            HessianOutput hessianOutput = new HessianOutput(byteArrayOutputStream);
            hessianOutput.writeObject(obj);

            return byteArrayOutputStream.toByteArray();
        } catch (Exception e) {
            throw new SerializeException("Serialization failed");
        }

    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clazz) {

        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes)) {
            HessianInput hessianInput = new HessianInput(byteArrayInputStream);
            Object o = hessianInput.readObject();

            return clazz.cast(o);

        } catch (Exception e) {
            throw new SerializeException("Deserialization failed");
        }

    }
}



2.3 Protostuff

Google的protobuf因为其编码后体积小,序列化/反序列化性能好,被广泛使用。但是protobuf需要编写.proto文件,再通过protobuf转换成对应的java代码,非常不好维护。

protostuff就是为了解决这个痛点而产生的。通过protostuff,不需要编写.proto文件,只需要编写普通的java bean就可以使用protobuf的序列化/反序列化。特点:使用简单,高性能。

public class ProtostuffSerializer implements Serializer {

    /**
     * Avoid re applying buffer space every time serialization
     */
    private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

    @Override
    public byte[] serialize(Object obj) {
        Class<?> clazz = obj.getClass();
        Schema schema = RuntimeSchema.getSchema(clazz);
        byte[] bytes;
        try {
            bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER);
        } finally {
            BUFFER.clear();
        }
        return bytes;
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
        Schema<T> schema = RuntimeSchema.getSchema(clazz);
        T obj = schema.newMessage();
        ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
        return obj;
    }
}



版权声明:本文为qq_41523340原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。