netty5与spring集成,实现配置启动–(一)

  • Post author:
  • Post category:其他


以前都是在自己的域名下写文章,CSDN开篇第一文献上自己的一个小小DEMO项目

首先要感谢鑫鑫哥哥,您的文章给了我最初的入门

附传送门:

http://my.oschina.net/xinxingegeya/blog/289258

http://my.oschina.net/xinxingegeya/blog/295408

同时,最关键的参考书:

netty权威指南,文中部分文字摘自此书

============================以下是正文内容===============================================

项目需求:


1.使用netty实现可配置化的NIO通讯服务器


2.要求支持多种通讯协议以及长短链接,如http,https,TCP,TLS,MQTT等


3.支持私有协议拓展开发

以上因为是个人demo项目,所以未做其他方面的需求,比如压力测试,集群部署等,这些自己也需要在下下个阶段继续研究(下个阶段主要研究dubbo和完善自己的demo项目)。


1.Main类

首先我们少不了一个Main类,这个是netty启动的类,直接看代码。

package com.omen.netty.server;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.FileSystemXmlApplicationContext;

import com.omen.netty.server.sysPojo.SystemInfo;

public class Main {

	private static Logger log = Logger.getLogger(Main.class);
	 @Autowired
	 private SystemInfo systemInfo;
	
	static {
		// 先加载spring
		log.info("准备载入spring...");
		String url = "../classes/omen.xml";
		// 保存context
		SystemInfo.setCtx(new FileSystemXmlApplicationContext(url));
		log.info("载入spring 完毕...");
	}

	public static void main(String[] args) throws Exception {
		IServer iServer = (IServer) SystemInfo.getCtx().getBean("basicServer");
		iServer.start();
	}

}

这里在启动main方法的时候会首先去加载spring配置文件,同时保存context。期间应用了一个systemInfo对象为该系统的系统配置

package com.omen.netty.server.sysPojo;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;

import com.omen.netty.utils.StringUtil;

public class SystemInfo {
	
	private static Logger log = Logger.getLogger(SystemInfo.class);
	
	
	public SystemInfo(String protocolType,Integer port,Integer channelType) throws Exception{
		
		/**
		 * 基础校验
		 */
		if(StringUtil.isEmpty(protocolType)){
			throw new Exception("protocolType is not setted");
		}
		
		if(port == null){
			throw new Exception("port is not setted");
		}
		
		if(channelType == null){
			throw new Exception("channelType is not setted");
		}
		
		this.protocolType=protocolType.toUpperCase();
		this.port=port;
		this.isSsl=false;
		this.channelType=channelType;
	}
	
	public SystemInfo(String protocolType,Integer port,Integer channelType,
			Integer bossGroupSize,Integer workerGroupSize) throws Exception{
		
		/**
		 * 基础校验
		 */
		if(StringUtil.isEmpty(protocolType)){
			throw new Exception("protocolType is not setted");
		}
		
		if(port == null){
			throw new Exception("port is not setted");
		}
		if(channelType == null){
			throw new Exception("channelType is not setted");
		}
		
		this.protocolType=protocolType.toUpperCase();
		this.port=port;
		this.isSsl=false;
		this.channelType=channelType;
		this.bossGroupSize=bossGroupSize;
		this.workerGroupSize=workerGroupSize;
	}
	
	
	public SystemInfo(String protocolType,Integer port,Integer channelType, boolean isSsl,String jksPath,
			String jksPwd) throws Exception{
		
		/**
		 * 基础校验
		 */
		if(StringUtil.isEmpty(protocolType)){
			throw new Exception("protocolType is not setted");
		}
		
		if(port == null){
			throw new Exception("port is not setted");
		}
		if(isSsl){
			if(StringUtil.isEmpty(jksPath)){
				throw new Exception("jksPath type is not setted");
			}
			if(StringUtil.isEmpty(jksPwd)){
				throw new Exception("jksPwd type is not setted");
			}
		}
		
		if(channelType == null){
			throw new Exception("channelType is not setted");
		}
		
		this.protocolType=protocolType.toUpperCase();
		this.port=port;
		this.isSsl=isSsl;
		this.jksPath=jksPath;
		this.jksPwd=jksPwd;
		this.channelType=channelType;
	}
	
	public SystemInfo(String protocolType,Integer port,Integer channelType, boolean isSsl,String jksPath,
			String jksPwd,Integer bossGroupSize,Integer workerGroupSize) throws Exception{
		
		/**
		 * 基础校验
		 */
		if(StringUtil.isEmpty(protocolType)){
			throw new Exception("protocolType is not setted");
		}
		
		if(port == null){
			throw new Exception("port is not setted");
		}
		if(isSsl){
			if(StringUtil.isEmpty(jksPath)){
				throw new Exception("jksPath type is not setted");
			}
			if(StringUtil.isEmpty(jksPwd)){
				throw new Exception("jksPwd type is not setted");
			}
		}
		
		if(channelType == null){
			throw new Exception("channelType is not setted");
		}
		
		this.protocolType=protocolType.toUpperCase();
		this.port=port;
		this.isSsl=isSsl;
		this.jksPath=jksPath;
		this.jksPwd=jksPwd;
		this.channelType=channelType;
		this.bossGroupSize=bossGroupSize;
		this.workerGroupSize=workerGroupSize;
	}
	
	private static String protocolType;

	private static  Integer port;
	
	private static Boolean isSsl;
	
	private static String jksPath;
	
	private static  String jksPwd;
	
	private static Integer channelType;

	private static  ApplicationContext ctx;
	
	private static EventLoopGroup bossGroup ;
	
	private static EventLoopGroup workerGroup;
	
	private static Integer bossGroupSize;
	
	private static Integer workerGroupSize;
	
	
	public static void printSysInfo(){
		log.info("**************SYSTEM INFO******************");
		log.info("protocolType  : " + protocolType);
		log.info("port          : " + port);
		log.info("channelType   : " + channelType + " (0=NIO 1=OIO)");
		log.info("isSsl         : " + isSsl);
		if(!StringUtil.isEmpty(jksPath))
			log.info("jksPath       : " + jksPath);
		if(!StringUtil.isEmpty(jksPwd))
			log.info("jksPwd        : " + jksPwd);
		if(bossGroupSize!=null)
			log.info("bossGroupSize : " + bossGroupSize);
		if(workerGroupSize!=null)
			log.info("workerGroupSize: " + workerGroupSize);
		log.info("**************SYSTEM INFO******************");
	}
	
	public static void shutDownGraceFully(){
		bossGroup.shutdownGracefully();
		workerGroup.shutdownGracefully();
	}
	


	public static String getProtocolType() {
		return protocolType;
	}
	
	public static void setProtocolType(String protocolType) {
		SystemInfo.protocolType = protocolType;
	}
	
	public static Integer getPort() {
		return port;
	}

	public static void setPort(Integer port) {
		SystemInfo.port = port;
	}

	public static Boolean getIsSsl() {
		return isSsl;
	}

	public static void setIsSsl(Boolean isSsl) {
		SystemInfo.isSsl = isSsl;
	}

	public static String getJksPath() {
		return jksPath;
	}

	public static void setJksPath(String jksPath) {
		SystemInfo.jksPath = jksPath;
	}

	public static String getJksPwd() {
		return jksPwd;
	}

	public static void setJksPwd(String jksPwd) {
		SystemInfo.jksPwd = jksPwd;
	}

	public static Integer getChannelType() {
		return channelType;
	}

	public static void setChannelType(Integer channelType) {
		SystemInfo.channelType = channelType;
	}
	
	public static ApplicationContext getCtx() {
		return ctx;
	}
	
	public static void setCtx(ApplicationContext ctx) {
		SystemInfo.ctx = ctx;
	}

	public static Integer getBossGroupSize() {
		return bossGroupSize;
	}

	public static void setBossGroupSize(Integer bossGroupSize) {
		SystemInfo.bossGroupSize = bossGroupSize;
	}
	
	public static EventLoopGroup getBossGroup() {
		return bossGroup;
	}
	
	public static void setBossGroup(EventLoopGroup bossGroup) {
		SystemInfo.bossGroup = bossGroup;
	}
	
	public static EventLoopGroup getWorkerGroup() {
		return workerGroup;
	}
	
	public static void setWorkerGroup(EventLoopGroup workerGroup) {
		SystemInfo.workerGroup = workerGroup;
	}

	public static Integer getWorkerGroupSize() {
		return workerGroupSize;
	}

	public static void setWorkerGroupSize(Integer workerGroupSize) {
		SystemInfo.workerGroupSize = workerGroupSize;
	}
	
	public static void main(String[] args) {
		
	}
	

}

systemInfo里面涵盖了多个构造函数,这个在后面会有解释(想通过spring直接配置系统参数),目前有个更好的想法就是通过配置map去完成配置系统参数,但是目前还没改好,改好以后会更新代码。

IServer为server接口

package com.omen.netty.server;


public interface IServer {
	
	public void start() throws Exception;
	
	public void stop() throws Exception;

	public void restart() throws Exception;
}

我们的实现类

package com.omen.netty.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;

import org.springframework.beans.factory.annotation.Autowired;

import com.omen.netty.server.factory.ServerChannelFactory;
import com.omen.netty.server.sysPojo.SystemInfo;

public class BasicServer implements IServer {

    private Channel acceptorChannel;
    
	@Autowired
    ServerChannelFactory serverChannelFactory;
    

	
	@Override
	public void start()throws  Exception{
		try{
			acceptorChannel = serverChannelFactory.createAcceptorChannel();
			acceptorChannel.closeFuture().sync();
		}finally{
			//优雅退出,释放线程组资源
			SystemInfo.shutDownGraceFully();
		}
	}

	@Override
	public void stop()throws  Exception {
		try{
			if(acceptorChannel!=null)
				acceptorChannel.close().addListener(ChannelFutureListener.CLOSE);
		}finally{
			//优雅退出,释放线程组资源
			SystemInfo.shutDownGraceFully();
		}
	}

	@Override
	public void restart()throws  Exception {
		stop();
	    start();
	}
	
}

在我们的实现类中,start方法和stop方法最终都会调用systemInfo的shutDownGracefully以便释放资源。其中start当中会等待服务端链路关闭以后main函数才退出。


2.几个工厂

先看代码

channel工厂

package com.omen.netty.server.factory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;

import org.apache.log4j.Logger;

import com.omen.netty.exception.SysErrException;
import com.omen.netty.server.sysPojo.ProtocolType;
import com.omen.netty.server.sysPojo.SystemInfo;

public class ServerChannelFactory {


	private static Logger log = Logger.getLogger(ServerChannelFactory.class);
	
	public  Channel createAcceptorChannel() throws SysErrException{
		
		final ServerBootstrap serverBootstrap = ServerBootstrapFactory.createServerBootstrap();
		serverBootstrap.childHandler(getChildHandler());
		log.info("创建Server...");
		 try {
			 ChannelFuture channelFuture = serverBootstrap.bind(SystemInfo.getPort()).sync();
			 channelFuture.awaitUninterruptibly();
	            if (channelFuture.isSuccess()) {
	            	SystemInfo.printSysInfo();
	                return channelFuture.channel();
	            } else {
	            	String errMsg="Failed to open socket! Cannot bind to port: "+SystemInfo.getPort()+"!";
	            	log.error(errMsg);
	            	throw new SysErrException(errMsg);
	            }
				
		} catch (Exception e){
			throw new SysErrException(e);
		}
	}
	
	
	private static ChannelInitializer
   
   
    
     getChildHandler() throws SysErrException{
		if(ProtocolType.HTTP.equals(SystemInfo.getProtocolType())
				||ProtocolType.HTTPS.equals(SystemInfo.getProtocolType())){
			return (ChannelInitializer
    
    
     
     )SystemInfo.getCtx().getBean("httpServerInitializer");
		}
		
		else if(ProtocolType.TCP.equals(SystemInfo.getProtocolType()))
			return (ChannelInitializer
     
     
      
      )SystemInfo.getCtx().getBean("tcpServerInitializer");
		
		else if(ProtocolType.CUSTOM.equals(SystemInfo.getProtocolType()))
			return (ChannelInitializer
      
      
       
       )SystemInfo.getCtx().getBean("customServerInitializer");
		
		else{
			String errMsg="undefined protocol:"+SystemInfo.getProtocolType()+"!";
			throw new SysErrException(errMsg);
		}
		
	}
}

      
      
     
     
    
    
   
   

ServerBootStrap工厂

package com.omen.netty.server.factory;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.oio.OioServerSocketChannel;

import com.omen.netty.server.sysPojo.ChannelType;
import com.omen.netty.server.sysPojo.SystemInfo;


/**
 * @author liyidong       
 * @version 1.0     
 * @created 2014-12-16 上午11:14:33 
 * @function:工厂模式  server服务器 ServerBootstrap创建
 */
public class ServerBootstrapFactory {
	private ServerBootstrapFactory() {
    }
    public static ServerBootstrap createServerBootstrap() throws UnsupportedOperationException {
         
    	ServerBootstrap serverBootstrap = new ServerBootstrap();
        switch (SystemInfo.getChannelType()) {
            case ChannelType.NIO:
            	EventLoopGroup bossGroup = new NioEventLoopGroup();
            	EventLoopGroup workerGroup = new NioEventLoopGroup();
            	serverBootstrap.group(bossGroup, workerGroup);
            	SystemInfo.setBossGroup(bossGroup);
            	SystemInfo.setWorkerGroup(workerGroup);
            	serverBootstrap.channel(NioServerSocketChannel.class);
            	
            	return serverBootstrap;
            //TODO
            case ChannelType.OIO:
            	serverBootstrap.group(new OioEventLoopGroup());
            	serverBootstrap.channel(OioServerSocketChannel.class);
                 
                return serverBootstrap;
            default:
                throw new UnsupportedOperationException("Failed to create ServerBootstrap,  " + SystemInfo.getChannelType() + " not supported!");
        }
    }
}

以上是两个关键类


在ServerBootstrapFactory中

我们创建了两个NioEventLoopGroup实例。NioEventLoopGroup是一个多线程的I/O操作事件循环池,Netty为各种传输方式提供了多种EventLoopGroup的实现。NioEventLoopGroup专门用于网络事件的处理,实际上它们就是Reactor线程组。这里创建两个的原因是一个用于服务端接受客服端的链接,另一个用于进行SocketChannel的网络读写。

我们还在ServerBootstrapFactory中创建了ServerBootstrap对象,它是netty用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度。

传送门,讲解ServerBootstrap工作原理

http://blog.csdn.net/zxhoo/article/details/17532857

接下来我们调用了ServerBootstrap的group方法,将两个NIO线程组当作入参传递到ServerBootstrap,借着设置创建的channel类型为NioServerSocketChannel,他的功能是监听网络事件类型并创建channel。


在ServerChannelFactory中

我们对ServerBootstrap绑定了I/O事件的处理类,它的作用类似于Reactor当中的handler,主要用于处理网络是I/O事件,例如记录日志、对消息进行编解码等。

ServerBootstrap配置完成以后,调用它的bind方法绑定监听端口,随后,调用它的同步阻塞方法sync等待绑定完成。完成之后netty会返回一个channelFuture,它的功能类主要用于异步操作的通知回调。

先写到这里,下班去吃饭。



版权声明:本文为jackieliyido原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。