| New file | 
 |  |  | 
 |  |  | package com.dy.common.mw.channel.tcp; | 
 |  |  |  | 
 |  |  | import java.io.IOException; | 
 |  |  | import java.net.InetSocketAddress; | 
 |  |  |  | 
 |  |  | import org.apache.mina.transport.socket.nio.NioSocketAcceptor; | 
 |  |  | import org.apache.mina.core.session.IdleStatus ; | 
 |  |  | import org.apache.mina.core.session.IoSession; | 
 |  |  | import org.apache.mina.transport.socket.SocketSessionConfig; | 
 |  |  | import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder ; | 
 |  |  | import org.apache.mina.filter.codec.ProtocolCodecFilter; | 
 |  |  | import org.apache.mina.filter.executor.ExecutorFilter ; | 
 |  |  |  | 
 |  |  | import com.dy.common.mw.UnitAdapterInterface; | 
 |  |  | import com.dy.common.mw.UnitInterface; | 
 |  |  | import com.dy.common.mw.UnitCallbackInterface; | 
 |  |  |  | 
 |  |  | @SuppressWarnings("unused") | 
 |  |  | public class TcpUnit implements UnitInterface { | 
 |  |  | 	 | 
 |  |  |    private static final TcpUnit instance = new TcpUnit() ; | 
 |  |  |    private static boolean started = false ; | 
 |  |  | 	 | 
 |  |  |    private TcpUnitAdapter adapter ; | 
 |  |  |    private TcpIoHandler tcpIoHandler ; | 
 |  |  |    private DataCodecFactory dataCodecFactory ; | 
 |  |  | 	 | 
 |  |  |    private TcpUnit(){} ; | 
 |  |  | 	 | 
 |  |  |    public static TcpUnit getInstance(){ | 
 |  |  |       return instance ; | 
 |  |  |    } | 
 |  |  |  | 
 |  |  |    /** | 
 |  |  |     * 把IoSession会话的ID属性及协议名称版本号设置到IoSession属性中 | 
 |  |  |     * @param session | 
 |  |  |     * @param rtuAddr | 
 |  |  |     * @param protocolName | 
 |  |  |     * @param protocolVersion | 
 |  |  |     * @throws Exception | 
 |  |  |     */ | 
 |  |  |    public void setIoSessionArrs(IoSession session, String rtuAddr, String protocolName, Short protocolVersion) throws Exception { | 
 |  |  |       session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr, rtuAddr) ; | 
 |  |  |       session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName, protocolName) ; | 
 |  |  |       session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion, protocolVersion) ; | 
 |  |  |    } | 
 |  |  |  | 
 |  |  |    @Override | 
 |  |  |    public void setAdapter(UnitAdapterInterface adapter) throws Exception { | 
 |  |  |       if(adapter == null){ | 
 |  |  |          throw new Exception("TCP模块适配器对象不能为空!") ; | 
 |  |  |       } | 
 |  |  |       this.adapter = (TcpUnitAdapter)adapter ;  | 
 |  |  |       TcpConfigVo vo = this.adapter.getConfig() ; | 
 |  |  |       if(vo == null){ | 
 |  |  |          throw new Exception("TCP模块连接属性配置对象不能为空!") ; | 
 |  |  |       } | 
 |  |  |       if(vo.port == null || vo.idle == null || vo.processors == null){ | 
 |  |  |          throw new Exception("TCP模块连接属性配置对象属性值不能为空!") ; | 
 |  |  |       } | 
 |  |  |       if(this.adapter.newPrefixedDataAvailableHandle() == null){ | 
 |  |  |          throw new Exception("TCP模块上行数据解码类为空!") ; | 
 |  |  |       } | 
 |  |  |       if(this.adapter.newSessionEventCallback() == null){ | 
 |  |  |          throw new Exception("TCP模块事件处理回调类为空!") ; | 
 |  |  |       } | 
 |  |  |    } | 
 |  |  |    /** | 
 |  |  |     * 启动模块 | 
 |  |  |     */ | 
 |  |  |    public void start(UnitCallbackInterface callback) throws Exception { | 
 |  |  |       if(!started){ | 
 |  |  |          started = true ; | 
 |  |  |          /** | 
 |  |  |           * 异步非阻塞网络通信接收器,负责接收联接请求 | 
 |  |  |           * 同时设置IoProcessor个数 | 
 |  |  |           * 这个地方用于执行真正的IO 操作,默认启用的线程个数是CPU 的核数+1,譬如:单CPU 双 | 
 |  |  |           * 核的电脑,默认的IoProcessor 线程会创建3 个。这也就是说一个IoAcceptor 或者 | 
 |  |  |           * IoConnector 默认会关联一个IoProcessor 池,这个池中有3 个IoProcessor。因为IO 操作 | 
 |  |  |           * 耗费资源,所以这里使用IoProcessor 池来完成数据的读写操作,有助于提高性能。这也就 | 
 |  |  |           * 是前面说的IoAccetor、IoConnector 使用一个Selector,而IoProcessor 使用自己单独的 | 
 |  |  |           * Selector 的原因。 | 
 |  |  |           * 那么为什么IoProcessor 池中的IoProcessor 数量只比CPU 的核数大1 呢?因为IO 读写操 | 
 |  |  |           * 作是耗费CPU 的操作,而每一核CPU 同时只能运行一个线程,因此IoProcessor 池中的 | 
 |  |  |           * IoProcessor 的数量并不是越多越好。 | 
 |  |  |           */ | 
 |  |  |          NioSocketAcceptor acceptor = new NioSocketAcceptor(this.adapter.getConfig().processors); | 
 |  |  | 			 | 
 |  |  |          SocketSessionConfig seConf = acceptor.getSessionConfig() ; | 
 |  |  | 			 | 
 |  |  |          /* 设置读数据时一次性申请堆缓存大小, | 
 |  |  |           * 一般不需要调用这个方法,因为IoProcessor 会自动调整缓冲的大小。 | 
 |  |  |           * 可以调用setMinReadBufferSize()、setMaxReadBufferSize()方法, | 
 |  |  |           * 这样无论IoProcessor如何自动调整,都会在你指定的区间。 | 
 |  |  |           */ | 
 |  |  |          //seConf.setReadBufferSize(1024); | 
 |  |  | 			 | 
 |  |  |          //设置网络联接空闲时长 | 
 |  |  |          seConf.setIdleTime(IdleStatus.BOTH_IDLE, this.adapter.getConfig().idle); | 
 |  |  |  | 
 |  |  |          //得到网络 通信数据过滤器链 | 
 |  |  |          DefaultIoFilterChainBuilder chain = acceptor.getFilterChain() ; | 
 |  |  |          //生成编解码过滤器工厂类 | 
 |  |  |          dataCodecFactory = new DataCodecFactory(this.adapter) ; | 
 |  |  |          //设置“protocol”,加入编解码过滤器,过滤器在IoProcessor线程中执行 | 
 |  |  |          chain.addLast("protocol", new ProtocolCodecFilter(dataCodecFactory)); | 
 |  |  | 			 | 
 |  |  |          /* | 
 |  |  |           * 一般ExecutorFilter 都要放在ProtocolCodecFilter过滤器的后面, | 
 |  |  |           * 也就是让编解码运行在IoProcessor所在的线程,因为编解码处理的数据都是 | 
 |  |  |           * 由IoProcessor读取和发送的,没必要开启新的线程,否则性能反而会下降。 | 
 |  |  |           * ExecutorFilter过程器会启动一个线程池,处理后续代码逻辑。 | 
 |  |  |           * 一般使用ExecutorFilter的典型场景是将业务逻辑(譬如:耗时的数据库操作) | 
 |  |  |           * 放在单独的线程中运行,也就是说与IO处理无关的操作可以考虑使用ExecutorFilter来异步执行。 | 
 |  |  |           * 本处用法,使ExecutorFilter线程池中的线程处理IOHandler(TcpIoHandler)操作 | 
 |  |  |           */ | 
 |  |  |          chain.addLast("exceutor", new ExecutorFilter()); | 
 |  |  |  | 
 |  |  |          //业务逻辑处理器,负责处理网络会话及输入输出数据 | 
 |  |  |          tcpIoHandler = new TcpIoHandler(this.adapter) ; | 
 |  |  |          acceptor.setHandler(tcpIoHandler) ; | 
 |  |  |  | 
 |  |  |          boolean isException = false ; | 
 |  |  |          try { | 
 |  |  |             acceptor.bind(new InetSocketAddress(this.adapter.getConfig().port)); | 
 |  |  |          } catch (IOException e) { | 
 |  |  |             e.printStackTrace(); | 
 |  |  |             System.out.println("TCP通信模块启动失败!" + (e.getMessage()==null?"":e.getMessage())); | 
 |  |  |             isException = true ; | 
 |  |  |          }finally{ | 
 |  |  |             ; | 
 |  |  |          } | 
 |  |  |  | 
 |  |  |          if(!isException){ | 
 |  |  |             if(this.adapter.getConfig().showStartInfo != null && this.adapter.getConfig().showStartInfo.booleanValue()){ | 
 |  |  |                System.out.println("TCP模块成功启动,端口:" + this.adapter.getConfig().port); | 
 |  |  |             } | 
 |  |  |          } | 
 |  |  | 			 | 
 |  |  |          callback.call(null); | 
 |  |  |       } | 
 |  |  |    } | 
 |  |  |  | 
 |  |  |    /** | 
 |  |  |     * 停止模块运行,将不再接入TCP网络连接,并把已经tcp连接的全部断连接 | 
 |  |  |     * @param callback | 
 |  |  |     * @throws Exception | 
 |  |  |     */ | 
 |  |  |    @Override | 
 |  |  |    public void stop(UnitCallbackInterface callback) throws Exception { | 
 |  |  |       this.tcpIoHandler.stop(); | 
 |  |  |       this.dataCodecFactory.stop(); | 
 |  |  |       this.adapter.newUnitStopCallback().callback(); | 
 |  |  |       callback.call(null); | 
 |  |  |    } | 
 |  |  |  | 
 |  |  |    /** | 
 |  |  |     * 解除停止,恢复TCP服务运行 | 
 |  |  |     * @throws Exception | 
 |  |  |     */ | 
 |  |  |    public void recover() throws Exception { | 
 |  |  |       this.tcpIoHandler.recover(); | 
 |  |  |       this.dataCodecFactory.recover(); | 
 |  |  |    } | 
 |  |  | 	 | 
 |  |  |  | 
 |  |  | } |