liurunyu
7 天以前 4f99f59668c9160ca60958b7347944def26f2228
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/tcp/TcpUnit.java
New file
@@ -0,0 +1,167 @@
package com.dy.common.mw.channel.tcp;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.apache.mina.core.session.IdleStatus ;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder ;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter ;
import com.dy.common.mw.UnitAdapterInterface;
import com.dy.common.mw.UnitInterface;
import com.dy.common.mw.UnitCallbackInterface;
@SuppressWarnings("unused")
public class TcpUnit implements UnitInterface {
   private static final TcpUnit instance = new TcpUnit() ;
   private static boolean started = false ;
   private TcpUnitAdapter adapter ;
   private TcpIoHandler tcpIoHandler ;
   private DataCodecFactory dataCodecFactory ;
   private TcpUnit(){} ;
   public static TcpUnit getInstance(){
      return instance ;
   }
   /**
    * 把IoSession会话的ID属性及协议名称版本号设置到IoSession属性中
    * @param session
    * @param rtuAddr
    * @param protocolName
    * @param protocolVersion
    * @throws Exception
    */
   public void setIoSessionArrs(IoSession session, String rtuAddr, String protocolName, Short protocolVersion) throws Exception {
      session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr, rtuAddr) ;
      session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName, protocolName) ;
      session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion, protocolVersion) ;
   }
   @Override
   public void setAdapter(UnitAdapterInterface adapter) throws Exception {
      if(adapter == null){
         throw new Exception("TCP模块适配器对象不能为空!") ;
      }
      this.adapter = (TcpUnitAdapter)adapter ;
      TcpConfigVo vo = this.adapter.getConfig() ;
      if(vo == null){
         throw new Exception("TCP模块连接属性配置对象不能为空!") ;
      }
      if(vo.port == null || vo.idle == null || vo.processors == null){
         throw new Exception("TCP模块连接属性配置对象属性值不能为空!") ;
      }
      if(this.adapter.newPrefixedDataAvailableHandle() == null){
         throw new Exception("TCP模块上行数据解码类为空!") ;
      }
      if(this.adapter.newSessionEventCallback() == null){
         throw new Exception("TCP模块事件处理回调类为空!") ;
      }
   }
   /**
    * 启动模块
    */
   public void start(UnitCallbackInterface callback) throws Exception {
      if(!started){
         started = true ;
         /**
          * 异步非阻塞网络通信接收器,负责接收联接请求
          * 同时设置IoProcessor个数
          * 这个地方用于执行真正的IO 操作,默认启用的线程个数是CPU 的核数+1,譬如:单CPU 双
          * 核的电脑,默认的IoProcessor 线程会创建3 个。这也就是说一个IoAcceptor 或者
          * IoConnector 默认会关联一个IoProcessor 池,这个池中有3 个IoProcessor。因为IO 操作
          * 耗费资源,所以这里使用IoProcessor 池来完成数据的读写操作,有助于提高性能。这也就
          * 是前面说的IoAccetor、IoConnector 使用一个Selector,而IoProcessor 使用自己单独的
          * Selector 的原因。
          * 那么为什么IoProcessor 池中的IoProcessor 数量只比CPU 的核数大1 呢?因为IO 读写操
          * 作是耗费CPU 的操作,而每一核CPU 同时只能运行一个线程,因此IoProcessor 池中的
          * IoProcessor 的数量并不是越多越好。
          */
         NioSocketAcceptor acceptor = new NioSocketAcceptor(this.adapter.getConfig().processors);
         SocketSessionConfig seConf = acceptor.getSessionConfig() ;
         /* 设置读数据时一次性申请堆缓存大小,
          * 一般不需要调用这个方法,因为IoProcessor 会自动调整缓冲的大小。
          * 可以调用setMinReadBufferSize()、setMaxReadBufferSize()方法,
          * 这样无论IoProcessor如何自动调整,都会在你指定的区间。
          */
         //seConf.setReadBufferSize(1024);
         //设置网络联接空闲时长
         seConf.setIdleTime(IdleStatus.BOTH_IDLE, this.adapter.getConfig().idle);
         //得到网络 通信数据过滤器链
         DefaultIoFilterChainBuilder chain = acceptor.getFilterChain() ;
         //生成编解码过滤器工厂类
         dataCodecFactory = new DataCodecFactory(this.adapter) ;
         //设置“protocol”,加入编解码过滤器,过滤器在IoProcessor线程中执行
         chain.addLast("protocol", new ProtocolCodecFilter(dataCodecFactory));
         /*
          * 一般ExecutorFilter 都要放在ProtocolCodecFilter过滤器的后面,
          * 也就是让编解码运行在IoProcessor所在的线程,因为编解码处理的数据都是
          * 由IoProcessor读取和发送的,没必要开启新的线程,否则性能反而会下降。
          * ExecutorFilter过程器会启动一个线程池,处理后续代码逻辑。
          * 一般使用ExecutorFilter的典型场景是将业务逻辑(譬如:耗时的数据库操作)
          * 放在单独的线程中运行,也就是说与IO处理无关的操作可以考虑使用ExecutorFilter来异步执行。
          * 本处用法,使ExecutorFilter线程池中的线程处理IOHandler(TcpIoHandler)操作
          */
         chain.addLast("exceutor", new ExecutorFilter());
         //业务逻辑处理器,负责处理网络会话及输入输出数据
         tcpIoHandler = new TcpIoHandler(this.adapter) ;
         acceptor.setHandler(tcpIoHandler) ;
         boolean isException = false ;
         try {
            acceptor.bind(new InetSocketAddress(this.adapter.getConfig().port));
         } catch (IOException e) {
            e.printStackTrace();
            System.out.println("TCP通信模块启动失败!" + (e.getMessage()==null?"":e.getMessage()));
            isException = true ;
         }finally{
            ;
         }
         if(!isException){
            if(this.adapter.getConfig().showStartInfo != null && this.adapter.getConfig().showStartInfo.booleanValue()){
               System.out.println("TCP模块成功启动,端口:" + this.adapter.getConfig().port);
            }
         }
         callback.call(null);
      }
   }
   /**
    * 停止模块运行,将不再接入TCP网络连接,并把已经tcp连接的全部断连接
    * @param callback
    * @throws Exception
    */
   @Override
   public void stop(UnitCallbackInterface callback) throws Exception {
      this.tcpIoHandler.stop();
      this.dataCodecFactory.stop();
      this.adapter.newUnitStopCallback().callback();
      callback.call(null);
   }
   /**
    * 解除停止,恢复TCP服务运行
    * @throws Exception
    */
   public void recover() throws Exception {
      this.tcpIoHandler.recover();
      this.dataCodecFactory.recover();
   }
}