| New file | 
|  |  |  | 
|---|
|  |  |  | package com.dy.common.mw.channel.tcp; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import java.io.IOException; | 
|---|
|  |  |  | import java.net.InetSocketAddress; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import org.apache.mina.transport.socket.nio.NioSocketAcceptor; | 
|---|
|  |  |  | import org.apache.mina.core.session.IdleStatus ; | 
|---|
|  |  |  | import org.apache.mina.core.session.IoSession; | 
|---|
|  |  |  | import org.apache.mina.transport.socket.SocketSessionConfig; | 
|---|
|  |  |  | import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder ; | 
|---|
|  |  |  | import org.apache.mina.filter.codec.ProtocolCodecFilter; | 
|---|
|  |  |  | import org.apache.mina.filter.executor.ExecutorFilter ; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import com.dy.common.mw.UnitAdapterInterface; | 
|---|
|  |  |  | import com.dy.common.mw.UnitInterface; | 
|---|
|  |  |  | import com.dy.common.mw.UnitCallbackInterface; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @SuppressWarnings("unused") | 
|---|
|  |  |  | public class TcpUnit implements UnitInterface { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private static final TcpUnit instance = new TcpUnit() ; | 
|---|
|  |  |  | private static boolean started = false ; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private TcpUnitAdapter adapter ; | 
|---|
|  |  |  | private TcpIoHandler tcpIoHandler ; | 
|---|
|  |  |  | private DataCodecFactory dataCodecFactory ; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private TcpUnit(){} ; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | public static TcpUnit getInstance(){ | 
|---|
|  |  |  | return instance ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 把IoSession会话的ID属性及协议名称版本号设置到IoSession属性中 | 
|---|
|  |  |  | * @param session | 
|---|
|  |  |  | * @param rtuAddr | 
|---|
|  |  |  | * @param protocolName | 
|---|
|  |  |  | * @param protocolVersion | 
|---|
|  |  |  | * @throws Exception | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public void setIoSessionArrs(IoSession session, String rtuAddr, String protocolName, Short protocolVersion) throws Exception { | 
|---|
|  |  |  | session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr, rtuAddr) ; | 
|---|
|  |  |  | session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName, protocolName) ; | 
|---|
|  |  |  | session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion, protocolVersion) ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void setAdapter(UnitAdapterInterface adapter) throws Exception { | 
|---|
|  |  |  | if(adapter == null){ | 
|---|
|  |  |  | throw new Exception("TCP模块适配器对象不能为空!") ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | this.adapter = (TcpUnitAdapter)adapter ; | 
|---|
|  |  |  | TcpConfigVo vo = this.adapter.getConfig() ; | 
|---|
|  |  |  | if(vo == null){ | 
|---|
|  |  |  | throw new Exception("TCP模块连接属性配置对象不能为空!") ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | if(vo.port == null || vo.idle == null || vo.processors == null){ | 
|---|
|  |  |  | throw new Exception("TCP模块连接属性配置对象属性值不能为空!") ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | if(this.adapter.newPrefixedDataAvailableHandle() == null){ | 
|---|
|  |  |  | throw new Exception("TCP模块上行数据解码类为空!") ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | if(this.adapter.newSessionEventCallback() == null){ | 
|---|
|  |  |  | throw new Exception("TCP模块事件处理回调类为空!") ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 启动模块 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public void start(UnitCallbackInterface callback) throws Exception { | 
|---|
|  |  |  | if(!started){ | 
|---|
|  |  |  | started = true ; | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 异步非阻塞网络通信接收器,负责接收联接请求 | 
|---|
|  |  |  | * 同时设置IoProcessor个数 | 
|---|
|  |  |  | * 这个地方用于执行真正的IO 操作,默认启用的线程个数是CPU 的核数+1,譬如:单CPU 双 | 
|---|
|  |  |  | * 核的电脑,默认的IoProcessor 线程会创建3 个。这也就是说一个IoAcceptor 或者 | 
|---|
|  |  |  | * IoConnector 默认会关联一个IoProcessor 池,这个池中有3 个IoProcessor。因为IO 操作 | 
|---|
|  |  |  | * 耗费资源,所以这里使用IoProcessor 池来完成数据的读写操作,有助于提高性能。这也就 | 
|---|
|  |  |  | * 是前面说的IoAccetor、IoConnector 使用一个Selector,而IoProcessor 使用自己单独的 | 
|---|
|  |  |  | * Selector 的原因。 | 
|---|
|  |  |  | * 那么为什么IoProcessor 池中的IoProcessor 数量只比CPU 的核数大1 呢?因为IO 读写操 | 
|---|
|  |  |  | * 作是耗费CPU 的操作,而每一核CPU 同时只能运行一个线程,因此IoProcessor 池中的 | 
|---|
|  |  |  | * IoProcessor 的数量并不是越多越好。 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | NioSocketAcceptor acceptor = new NioSocketAcceptor(this.adapter.getConfig().processors); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | SocketSessionConfig seConf = acceptor.getSessionConfig() ; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /* 设置读数据时一次性申请堆缓存大小, | 
|---|
|  |  |  | * 一般不需要调用这个方法,因为IoProcessor 会自动调整缓冲的大小。 | 
|---|
|  |  |  | * 可以调用setMinReadBufferSize()、setMaxReadBufferSize()方法, | 
|---|
|  |  |  | * 这样无论IoProcessor如何自动调整,都会在你指定的区间。 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | //seConf.setReadBufferSize(1024); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //设置网络联接空闲时长 | 
|---|
|  |  |  | seConf.setIdleTime(IdleStatus.BOTH_IDLE, this.adapter.getConfig().idle); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //得到网络 通信数据过滤器链 | 
|---|
|  |  |  | DefaultIoFilterChainBuilder chain = acceptor.getFilterChain() ; | 
|---|
|  |  |  | //生成编解码过滤器工厂类 | 
|---|
|  |  |  | dataCodecFactory = new DataCodecFactory(this.adapter) ; | 
|---|
|  |  |  | //设置“protocol”,加入编解码过滤器,过滤器在IoProcessor线程中执行 | 
|---|
|  |  |  | chain.addLast("protocol", new ProtocolCodecFilter(dataCodecFactory)); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /* | 
|---|
|  |  |  | * 一般ExecutorFilter 都要放在ProtocolCodecFilter过滤器的后面, | 
|---|
|  |  |  | * 也就是让编解码运行在IoProcessor所在的线程,因为编解码处理的数据都是 | 
|---|
|  |  |  | * 由IoProcessor读取和发送的,没必要开启新的线程,否则性能反而会下降。 | 
|---|
|  |  |  | * ExecutorFilter过程器会启动一个线程池,处理后续代码逻辑。 | 
|---|
|  |  |  | * 一般使用ExecutorFilter的典型场景是将业务逻辑(譬如:耗时的数据库操作) | 
|---|
|  |  |  | * 放在单独的线程中运行,也就是说与IO处理无关的操作可以考虑使用ExecutorFilter来异步执行。 | 
|---|
|  |  |  | * 本处用法,使ExecutorFilter线程池中的线程处理IOHandler(TcpIoHandler)操作 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | chain.addLast("exceutor", new ExecutorFilter()); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //业务逻辑处理器,负责处理网络会话及输入输出数据 | 
|---|
|  |  |  | tcpIoHandler = new TcpIoHandler(this.adapter) ; | 
|---|
|  |  |  | acceptor.setHandler(tcpIoHandler) ; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | boolean isException = false ; | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | acceptor.bind(new InetSocketAddress(this.adapter.getConfig().port)); | 
|---|
|  |  |  | } catch (IOException e) { | 
|---|
|  |  |  | e.printStackTrace(); | 
|---|
|  |  |  | System.out.println("TCP通信模块启动失败!" + (e.getMessage()==null?"":e.getMessage())); | 
|---|
|  |  |  | isException = true ; | 
|---|
|  |  |  | }finally{ | 
|---|
|  |  |  | ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | if(!isException){ | 
|---|
|  |  |  | if(this.adapter.getConfig().showStartInfo != null && this.adapter.getConfig().showStartInfo.booleanValue()){ | 
|---|
|  |  |  | System.out.println("TCP模块成功启动,端口:" + this.adapter.getConfig().port); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | callback.call(null); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 停止模块运行,将不再接入TCP网络连接,并把已经tcp连接的全部断连接 | 
|---|
|  |  |  | * @param callback | 
|---|
|  |  |  | * @throws Exception | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void stop(UnitCallbackInterface callback) throws Exception { | 
|---|
|  |  |  | this.tcpIoHandler.stop(); | 
|---|
|  |  |  | this.dataCodecFactory.stop(); | 
|---|
|  |  |  | this.adapter.newUnitStopCallback().callback(); | 
|---|
|  |  |  | callback.call(null); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 解除停止,恢复TCP服务运行 | 
|---|
|  |  |  | * @throws Exception | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public void recover() throws Exception { | 
|---|
|  |  |  | this.tcpIoHandler.recover(); | 
|---|
|  |  |  | this.dataCodecFactory.recover(); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | } | 
|---|