| package com.dy.common.mw.channel.tcp; | 
|   | 
| import java.io.IOException; | 
| import java.net.InetSocketAddress; | 
|   | 
| import org.apache.mina.transport.socket.nio.NioSocketAcceptor; | 
| import org.apache.mina.core.session.IdleStatus ; | 
| import org.apache.mina.core.session.IoSession; | 
| import org.apache.mina.transport.socket.SocketSessionConfig; | 
| import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder ; | 
| import org.apache.mina.filter.codec.ProtocolCodecFilter; | 
| import org.apache.mina.filter.executor.ExecutorFilter ; | 
|   | 
| import com.dy.common.mw.UnitAdapterInterface; | 
| import com.dy.common.mw.UnitInterface; | 
| import com.dy.common.mw.UnitCallbackInterface; | 
|   | 
| @SuppressWarnings("unused") | 
| public class TcpUnit implements UnitInterface { | 
|      | 
|     private static final TcpUnit instance = new TcpUnit() ; | 
|     private static boolean started = false ; | 
|      | 
|     private TcpUnitAdapter adapter ; | 
|     private TcpIoHandler tcpIoHandler ; | 
|     private DataCodecFactory dataCodecFactory ; | 
|      | 
|     private TcpUnit(){} ; | 
|      | 
|     public static TcpUnit getInstance(){ | 
|         return instance ; | 
|     } | 
|   | 
|     /** | 
|      * 把IoSession会话的ID属性及协议名称版本号设置到IoSession属性中 | 
|      * @param session | 
|      * @param rtuAddr | 
|      * @param protocolName | 
|      * @param protocolVersion | 
|      * @throws Exception | 
|      */ | 
|     public void setIoSessionArrs(IoSession session, String rtuAddr, String protocolName, Short protocolVersion) throws Exception { | 
|         session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr, rtuAddr) ; | 
|         session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName, protocolName) ; | 
|         session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion, protocolVersion) ; | 
|     } | 
|   | 
|     @Override | 
|     public void setAdapter(UnitAdapterInterface adapter) throws Exception { | 
|         if(adapter == null){ | 
|             throw new Exception("TCP模块适配器对象不能为空!") ; | 
|         } | 
|         this.adapter = (TcpUnitAdapter)adapter ;  | 
|         TcpConfigVo vo = this.adapter.getConfig() ; | 
|         if(vo == null){ | 
|             throw new Exception("TCP模块连接属性配置对象不能为空!") ; | 
|         } | 
|         if(vo.port == null || vo.idle == null || vo.processors == null){ | 
|             throw new Exception("TCP模块连接属性配置对象属性值不能为空!") ; | 
|         } | 
|         if(this.adapter.newPrefixedDataAvailableHandle() == null){ | 
|             throw new Exception("TCP模块上行数据解码类为空!") ; | 
|         } | 
|         if(this.adapter.newSessionEventCallback() == null){ | 
|             throw new Exception("TCP模块事件处理回调类为空!") ; | 
|         } | 
|     } | 
|     /** | 
|      * 启动模块 | 
|      */ | 
|     public void start(UnitCallbackInterface callback) throws Exception { | 
|         if(!started){ | 
|             started = true ; | 
|             /** | 
|              * 异步非阻塞网络通信接收器,负责接收联接请求 | 
|              * 同时设置IoProcessor个数 | 
|              * 这个地方用于执行真正的IO 操作,默认启用的线程个数是CPU 的核数+1,譬如:单CPU 双 | 
|              * 核的电脑,默认的IoProcessor 线程会创建3 个。这也就是说一个IoAcceptor 或者 | 
|              * IoConnector 默认会关联一个IoProcessor 池,这个池中有3 个IoProcessor。因为IO 操作 | 
|              * 耗费资源,所以这里使用IoProcessor 池来完成数据的读写操作,有助于提高性能。这也就 | 
|              * 是前面说的IoAccetor、IoConnector 使用一个Selector,而IoProcessor 使用自己单独的 | 
|              * Selector 的原因。 | 
|              * 那么为什么IoProcessor 池中的IoProcessor 数量只比CPU 的核数大1 呢?因为IO 读写操 | 
|              * 作是耗费CPU 的操作,而每一核CPU 同时只能运行一个线程,因此IoProcessor 池中的 | 
|              * IoProcessor 的数量并不是越多越好。 | 
|              */ | 
|             NioSocketAcceptor acceptor = new NioSocketAcceptor(this.adapter.getConfig().processors); | 
|              | 
|             SocketSessionConfig seConf = acceptor.getSessionConfig() ; | 
|              | 
|             /* 设置读数据时一次性申请堆缓存大小, | 
|              * 一般不需要调用这个方法,因为IoProcessor 会自动调整缓冲的大小。 | 
|              * 可以调用setMinReadBufferSize()、setMaxReadBufferSize()方法, | 
|              * 这样无论IoProcessor如何自动调整,都会在你指定的区间。 | 
|              */ | 
|             //seConf.setReadBufferSize(1024); | 
|              | 
|             //设置网络联接空闲时长 | 
|             seConf.setIdleTime(IdleStatus.BOTH_IDLE, this.adapter.getConfig().idle); | 
|   | 
|             //得到网络 通信数据过滤器链 | 
|             DefaultIoFilterChainBuilder chain = acceptor.getFilterChain() ; | 
|             //生成编解码过滤器工厂类 | 
|             dataCodecFactory = new DataCodecFactory(this.adapter) ; | 
|             //设置“protocol”,加入编解码过滤器,过滤器在IoProcessor线程中执行 | 
|             chain.addLast("protocol", new ProtocolCodecFilter(dataCodecFactory)); | 
|              | 
|             /* | 
|              * 一般ExecutorFilter 都要放在ProtocolCodecFilter过滤器的后面, | 
|              * 也就是让编解码运行在IoProcessor所在的线程,因为编解码处理的数据都是 | 
|              * 由IoProcessor读取和发送的,没必要开启新的线程,否则性能反而会下降。 | 
|              * ExecutorFilter过程器会启动一个线程池,处理后续代码逻辑。 | 
|              * 一般使用ExecutorFilter的典型场景是将业务逻辑(譬如:耗时的数据库操作) | 
|              * 放在单独的线程中运行,也就是说与IO处理无关的操作可以考虑使用ExecutorFilter来异步执行。 | 
|              * 本处用法,使ExecutorFilter线程池中的线程处理IOHandler(TcpIoHandler)操作 | 
|              */ | 
|             chain.addLast("exceutor", new ExecutorFilter()); | 
|   | 
|             //业务逻辑处理器,负责处理网络会话及输入输出数据 | 
|             tcpIoHandler = new TcpIoHandler(this.adapter) ; | 
|             acceptor.setHandler(tcpIoHandler) ; | 
|   | 
|             boolean isException = false ; | 
|             try { | 
|                 acceptor.bind(new InetSocketAddress(this.adapter.getConfig().port)); | 
|             } catch (IOException e) { | 
|                 e.printStackTrace(); | 
|                 System.out.println("TCP通信模块启动失败!" + (e.getMessage()==null?"":e.getMessage())); | 
|                 isException = true ; | 
|             }finally{ | 
|                 ; | 
|             } | 
|   | 
|             if(!isException){ | 
|                 if(this.adapter.getConfig().showStartInfo != null && this.adapter.getConfig().showStartInfo.booleanValue()){ | 
|                     System.out.println("TCP模块成功启动,端口:" + this.adapter.getConfig().port); | 
|                 } | 
|             } | 
|              | 
|             callback.call(null); | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * 停止模块运行,将不再接入TCP网络连接,并把已经tcp连接的全部断连接 | 
|      * @param callback | 
|      * @throws Exception | 
|      */ | 
|     @Override | 
|     public void stop(UnitCallbackInterface callback) throws Exception { | 
|         this.tcpIoHandler.stop(); | 
|         this.dataCodecFactory.stop(); | 
|         this.adapter.newUnitStopCallback().callback(); | 
|         callback.call(null); | 
|     } | 
|   | 
|     /** | 
|      * 解除停止,恢复TCP服务运行 | 
|      * @throws Exception | 
|      */ | 
|     public void recover() throws Exception { | 
|         this.tcpIoHandler.recover(); | 
|         this.dataCodecFactory.recover(); | 
|     } | 
|      | 
|   | 
| } |