以前都是在自己的域名下写文章,CSDN开篇第一文献上自己的一个小小DEMO项目
首先要感谢鑫鑫哥哥,您的文章给了我最初的入门
附传送门:
http://my.oschina.net/xinxingegeya/blog/289258
http://my.oschina.net/xinxingegeya/blog/295408
同时,最关键的参考书:
netty权威指南,文中部分文字摘自此书
============================以下是正文内容===============================================
项目需求:
1.使用netty实现可配置化的NIO通讯服务器
2.要求支持多种通讯协议以及长短链接,如http,https,TCP,TLS,MQTT等
3.支持私有协议拓展开发
以上因为是个人demo项目,所以未做其他方面的需求,比如压力测试,集群部署等,这些自己也需要在下下个阶段继续研究(下个阶段主要研究dubbo和完善自己的demo项目)。
1.Main类
首先我们少不了一个Main类,这个是netty启动的类,直接看代码。
package com.omen.netty.server;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import com.omen.netty.server.sysPojo.SystemInfo;
public class Main {
private static Logger log = Logger.getLogger(Main.class);
@Autowired
private SystemInfo systemInfo;
static {
// 先加载spring
log.info("准备载入spring...");
String url = "../classes/omen.xml";
// 保存context
SystemInfo.setCtx(new FileSystemXmlApplicationContext(url));
log.info("载入spring 完毕...");
}
public static void main(String[] args) throws Exception {
IServer iServer = (IServer) SystemInfo.getCtx().getBean("basicServer");
iServer.start();
}
}
这里在启动main方法的时候会首先去加载spring配置文件,同时保存context。期间应用了一个systemInfo对象为该系统的系统配置
package com.omen.netty.server.sysPojo;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import com.omen.netty.utils.StringUtil;
public class SystemInfo {
private static Logger log = Logger.getLogger(SystemInfo.class);
public SystemInfo(String protocolType,Integer port,Integer channelType) throws Exception{
/**
* 基础校验
*/
if(StringUtil.isEmpty(protocolType)){
throw new Exception("protocolType is not setted");
}
if(port == null){
throw new Exception("port is not setted");
}
if(channelType == null){
throw new Exception("channelType is not setted");
}
this.protocolType=protocolType.toUpperCase();
this.port=port;
this.isSsl=false;
this.channelType=channelType;
}
public SystemInfo(String protocolType,Integer port,Integer channelType,
Integer bossGroupSize,Integer workerGroupSize) throws Exception{
/**
* 基础校验
*/
if(StringUtil.isEmpty(protocolType)){
throw new Exception("protocolType is not setted");
}
if(port == null){
throw new Exception("port is not setted");
}
if(channelType == null){
throw new Exception("channelType is not setted");
}
this.protocolType=protocolType.toUpperCase();
this.port=port;
this.isSsl=false;
this.channelType=channelType;
this.bossGroupSize=bossGroupSize;
this.workerGroupSize=workerGroupSize;
}
public SystemInfo(String protocolType,Integer port,Integer channelType, boolean isSsl,String jksPath,
String jksPwd) throws Exception{
/**
* 基础校验
*/
if(StringUtil.isEmpty(protocolType)){
throw new Exception("protocolType is not setted");
}
if(port == null){
throw new Exception("port is not setted");
}
if(isSsl){
if(StringUtil.isEmpty(jksPath)){
throw new Exception("jksPath type is not setted");
}
if(StringUtil.isEmpty(jksPwd)){
throw new Exception("jksPwd type is not setted");
}
}
if(channelType == null){
throw new Exception("channelType is not setted");
}
this.protocolType=protocolType.toUpperCase();
this.port=port;
this.isSsl=isSsl;
this.jksPath=jksPath;
this.jksPwd=jksPwd;
this.channelType=channelType;
}
public SystemInfo(String protocolType,Integer port,Integer channelType, boolean isSsl,String jksPath,
String jksPwd,Integer bossGroupSize,Integer workerGroupSize) throws Exception{
/**
* 基础校验
*/
if(StringUtil.isEmpty(protocolType)){
throw new Exception("protocolType is not setted");
}
if(port == null){
throw new Exception("port is not setted");
}
if(isSsl){
if(StringUtil.isEmpty(jksPath)){
throw new Exception("jksPath type is not setted");
}
if(StringUtil.isEmpty(jksPwd)){
throw new Exception("jksPwd type is not setted");
}
}
if(channelType == null){
throw new Exception("channelType is not setted");
}
this.protocolType=protocolType.toUpperCase();
this.port=port;
this.isSsl=isSsl;
this.jksPath=jksPath;
this.jksPwd=jksPwd;
this.channelType=channelType;
this.bossGroupSize=bossGroupSize;
this.workerGroupSize=workerGroupSize;
}
private static String protocolType;
private static Integer port;
private static Boolean isSsl;
private static String jksPath;
private static String jksPwd;
private static Integer channelType;
private static ApplicationContext ctx;
private static EventLoopGroup bossGroup ;
private static EventLoopGroup workerGroup;
private static Integer bossGroupSize;
private static Integer workerGroupSize;
public static void printSysInfo(){
log.info("**************SYSTEM INFO******************");
log.info("protocolType : " + protocolType);
log.info("port : " + port);
log.info("channelType : " + channelType + " (0=NIO 1=OIO)");
log.info("isSsl : " + isSsl);
if(!StringUtil.isEmpty(jksPath))
log.info("jksPath : " + jksPath);
if(!StringUtil.isEmpty(jksPwd))
log.info("jksPwd : " + jksPwd);
if(bossGroupSize!=null)
log.info("bossGroupSize : " + bossGroupSize);
if(workerGroupSize!=null)
log.info("workerGroupSize: " + workerGroupSize);
log.info("**************SYSTEM INFO******************");
}
public static void shutDownGraceFully(){
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
public static String getProtocolType() {
return protocolType;
}
public static void setProtocolType(String protocolType) {
SystemInfo.protocolType = protocolType;
}
public static Integer getPort() {
return port;
}
public static void setPort(Integer port) {
SystemInfo.port = port;
}
public static Boolean getIsSsl() {
return isSsl;
}
public static void setIsSsl(Boolean isSsl) {
SystemInfo.isSsl = isSsl;
}
public static String getJksPath() {
return jksPath;
}
public static void setJksPath(String jksPath) {
SystemInfo.jksPath = jksPath;
}
public static String getJksPwd() {
return jksPwd;
}
public static void setJksPwd(String jksPwd) {
SystemInfo.jksPwd = jksPwd;
}
public static Integer getChannelType() {
return channelType;
}
public static void setChannelType(Integer channelType) {
SystemInfo.channelType = channelType;
}
public static ApplicationContext getCtx() {
return ctx;
}
public static void setCtx(ApplicationContext ctx) {
SystemInfo.ctx = ctx;
}
public static Integer getBossGroupSize() {
return bossGroupSize;
}
public static void setBossGroupSize(Integer bossGroupSize) {
SystemInfo.bossGroupSize = bossGroupSize;
}
public static EventLoopGroup getBossGroup() {
return bossGroup;
}
public static void setBossGroup(EventLoopGroup bossGroup) {
SystemInfo.bossGroup = bossGroup;
}
public static EventLoopGroup getWorkerGroup() {
return workerGroup;
}
public static void setWorkerGroup(EventLoopGroup workerGroup) {
SystemInfo.workerGroup = workerGroup;
}
public static Integer getWorkerGroupSize() {
return workerGroupSize;
}
public static void setWorkerGroupSize(Integer workerGroupSize) {
SystemInfo.workerGroupSize = workerGroupSize;
}
public static void main(String[] args) {
}
}
systemInfo里面涵盖了多个构造函数,这个在后面会有解释(想通过spring直接配置系统参数),目前有个更好的想法就是通过配置map去完成配置系统参数,但是目前还没改好,改好以后会更新代码。
IServer为server接口
package com.omen.netty.server;
public interface IServer {
public void start() throws Exception;
public void stop() throws Exception;
public void restart() throws Exception;
}
我们的实现类
package com.omen.netty.server;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import org.springframework.beans.factory.annotation.Autowired;
import com.omen.netty.server.factory.ServerChannelFactory;
import com.omen.netty.server.sysPojo.SystemInfo;
public class BasicServer implements IServer {
private Channel acceptorChannel;
@Autowired
ServerChannelFactory serverChannelFactory;
@Override
public void start()throws Exception{
try{
acceptorChannel = serverChannelFactory.createAcceptorChannel();
acceptorChannel.closeFuture().sync();
}finally{
//优雅退出,释放线程组资源
SystemInfo.shutDownGraceFully();
}
}
@Override
public void stop()throws Exception {
try{
if(acceptorChannel!=null)
acceptorChannel.close().addListener(ChannelFutureListener.CLOSE);
}finally{
//优雅退出,释放线程组资源
SystemInfo.shutDownGraceFully();
}
}
@Override
public void restart()throws Exception {
stop();
start();
}
}
在我们的实现类中,start方法和stop方法最终都会调用systemInfo的shutDownGracefully以便释放资源。其中start当中会等待服务端链路关闭以后main函数才退出。
2.几个工厂
先看代码
channel工厂
package com.omen.netty.server.factory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import org.apache.log4j.Logger;
import com.omen.netty.exception.SysErrException;
import com.omen.netty.server.sysPojo.ProtocolType;
import com.omen.netty.server.sysPojo.SystemInfo;
public class ServerChannelFactory {
private static Logger log = Logger.getLogger(ServerChannelFactory.class);
public Channel createAcceptorChannel() throws SysErrException{
final ServerBootstrap serverBootstrap = ServerBootstrapFactory.createServerBootstrap();
serverBootstrap.childHandler(getChildHandler());
log.info("创建Server...");
try {
ChannelFuture channelFuture = serverBootstrap.bind(SystemInfo.getPort()).sync();
channelFuture.awaitUninterruptibly();
if (channelFuture.isSuccess()) {
SystemInfo.printSysInfo();
return channelFuture.channel();
} else {
String errMsg="Failed to open socket! Cannot bind to port: "+SystemInfo.getPort()+"!";
log.error(errMsg);
throw new SysErrException(errMsg);
}
} catch (Exception e){
throw new SysErrException(e);
}
}
private static ChannelInitializer
getChildHandler() throws SysErrException{
if(ProtocolType.HTTP.equals(SystemInfo.getProtocolType())
||ProtocolType.HTTPS.equals(SystemInfo.getProtocolType())){
return (ChannelInitializer
)SystemInfo.getCtx().getBean("httpServerInitializer");
}
else if(ProtocolType.TCP.equals(SystemInfo.getProtocolType()))
return (ChannelInitializer
)SystemInfo.getCtx().getBean("tcpServerInitializer");
else if(ProtocolType.CUSTOM.equals(SystemInfo.getProtocolType()))
return (ChannelInitializer
)SystemInfo.getCtx().getBean("customServerInitializer");
else{
String errMsg="undefined protocol:"+SystemInfo.getProtocolType()+"!";
throw new SysErrException(errMsg);
}
}
}
ServerBootStrap工厂
package com.omen.netty.server.factory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import com.omen.netty.server.sysPojo.ChannelType;
import com.omen.netty.server.sysPojo.SystemInfo;
/**
* @author liyidong
* @version 1.0
* @created 2014-12-16 上午11:14:33
* @function:工厂模式 server服务器 ServerBootstrap创建
*/
public class ServerBootstrapFactory {
private ServerBootstrapFactory() {
}
public static ServerBootstrap createServerBootstrap() throws UnsupportedOperationException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
switch (SystemInfo.getChannelType()) {
case ChannelType.NIO:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
serverBootstrap.group(bossGroup, workerGroup);
SystemInfo.setBossGroup(bossGroup);
SystemInfo.setWorkerGroup(workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
return serverBootstrap;
//TODO
case ChannelType.OIO:
serverBootstrap.group(new OioEventLoopGroup());
serverBootstrap.channel(OioServerSocketChannel.class);
return serverBootstrap;
default:
throw new UnsupportedOperationException("Failed to create ServerBootstrap, " + SystemInfo.getChannelType() + " not supported!");
}
}
}
以上是两个关键类
在ServerBootstrapFactory中
我们创建了两个NioEventLoopGroup实例。NioEventLoopGroup是一个多线程的I/O操作事件循环池,Netty为各种传输方式提供了多种EventLoopGroup的实现。NioEventLoopGroup专门用于网络事件的处理,实际上它们就是Reactor线程组。这里创建两个的原因是一个用于服务端接受客服端的链接,另一个用于进行SocketChannel的网络读写。
我们还在ServerBootstrapFactory中创建了ServerBootstrap对象,它是netty用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度。
传送门,讲解ServerBootstrap工作原理
http://blog.csdn.net/zxhoo/article/details/17532857
接下来我们调用了ServerBootstrap的group方法,将两个NIO线程组当作入参传递到ServerBootstrap,借着设置创建的channel类型为NioServerSocketChannel,他的功能是监听网络事件类型并创建channel。
在ServerChannelFactory中
我们对ServerBootstrap绑定了I/O事件的处理类,它的作用类似于Reactor当中的handler,主要用于处理网络是I/O事件,例如记录日志、对消息进行编解码等。
ServerBootstrap配置完成以后,调用它的bind方法绑定监听端口,随后,调用它的同步阻塞方法sync等待绑定完成。完成之后netty会返回一个channelFuture,它的功能类主要用于异步操作的通知回调。
先写到这里,下班去吃饭。