package com.dy.common.mw.channel.tcp; 
 | 
  
 | 
import java.io.IOException; 
 | 
import java.net.InetSocketAddress; 
 | 
  
 | 
import org.apache.mina.transport.socket.nio.NioSocketAcceptor; 
 | 
import org.apache.mina.core.session.IdleStatus ; 
 | 
import org.apache.mina.core.session.IoSession; 
 | 
import org.apache.mina.transport.socket.SocketSessionConfig; 
 | 
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder ; 
 | 
import org.apache.mina.filter.codec.ProtocolCodecFilter; 
 | 
import org.apache.mina.filter.executor.ExecutorFilter ; 
 | 
  
 | 
import com.dy.common.mw.UnitAdapterInterface; 
 | 
import com.dy.common.mw.UnitInterface; 
 | 
import com.dy.common.mw.UnitCallbackInterface; 
 | 
  
 | 
@SuppressWarnings("unused") 
 | 
public class TcpUnit implements UnitInterface { 
 | 
     
 | 
    private static final TcpUnit instance = new TcpUnit() ; 
 | 
    private static boolean started = false ; 
 | 
     
 | 
    private TcpUnitAdapter adapter ; 
 | 
    private TcpIoHandler tcpIoHandler ; 
 | 
    private DataCodecFactory dataCodecFactory ; 
 | 
     
 | 
    private TcpUnit(){} ; 
 | 
     
 | 
    public static TcpUnit getInstance(){ 
 | 
        return instance ; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 把IoSession会话的ID属性及协议名称版本号设置到IoSession属性中 
 | 
     * @param session 
 | 
     * @param rtuAddr 
 | 
     * @param protocolName 
 | 
     * @param protocolVersion 
 | 
     * @throws Exception 
 | 
     */ 
 | 
    public void setIoSessionArrs(IoSession session, String rtuAddr, String protocolName, Short protocolVersion) throws Exception { 
 | 
        session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr, rtuAddr) ; 
 | 
        session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName, protocolName) ; 
 | 
        session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion, protocolVersion) ; 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void setAdapter(UnitAdapterInterface adapter) throws Exception { 
 | 
        if(adapter == null){ 
 | 
            throw new Exception("TCP模块适配器对象不能为空!") ; 
 | 
        } 
 | 
        this.adapter = (TcpUnitAdapter)adapter ;  
 | 
        TcpConfigVo vo = this.adapter.getConfig() ; 
 | 
        if(vo == null){ 
 | 
            throw new Exception("TCP模块连接属性配置对象不能为空!") ; 
 | 
        } 
 | 
        if(vo.port == null || vo.idle == null || vo.processors == null){ 
 | 
            throw new Exception("TCP模块连接属性配置对象属性值不能为空!") ; 
 | 
        } 
 | 
        if(this.adapter.newPrefixedDataAvailableHandle() == null){ 
 | 
            throw new Exception("TCP模块上行数据解码类为空!") ; 
 | 
        } 
 | 
        if(this.adapter.newSessionEventCallback() == null){ 
 | 
            throw new Exception("TCP模块事件处理回调类为空!") ; 
 | 
        } 
 | 
    } 
 | 
    /** 
 | 
     * 启动模块 
 | 
     */ 
 | 
    public void start(UnitCallbackInterface callback) throws Exception { 
 | 
        if(!started){ 
 | 
            started = true ; 
 | 
            /** 
 | 
             * 异步非阻塞网络通信接收器,负责接收联接请求 
 | 
             * 同时设置IoProcessor个数 
 | 
             * 这个地方用于执行真正的IO 操作,默认启用的线程个数是CPU 的核数+1,譬如:单CPU 双 
 | 
             * 核的电脑,默认的IoProcessor 线程会创建3 个。这也就是说一个IoAcceptor 或者 
 | 
             * IoConnector 默认会关联一个IoProcessor 池,这个池中有3 个IoProcessor。因为IO 操作 
 | 
             * 耗费资源,所以这里使用IoProcessor 池来完成数据的读写操作,有助于提高性能。这也就 
 | 
             * 是前面说的IoAccetor、IoConnector 使用一个Selector,而IoProcessor 使用自己单独的 
 | 
             * Selector 的原因。 
 | 
             * 那么为什么IoProcessor 池中的IoProcessor 数量只比CPU 的核数大1 呢?因为IO 读写操 
 | 
             * 作是耗费CPU 的操作,而每一核CPU 同时只能运行一个线程,因此IoProcessor 池中的 
 | 
             * IoProcessor 的数量并不是越多越好。 
 | 
             */ 
 | 
            NioSocketAcceptor acceptor = new NioSocketAcceptor(this.adapter.getConfig().processors); 
 | 
             
 | 
            SocketSessionConfig seConf = acceptor.getSessionConfig() ; 
 | 
             
 | 
            /* 设置读数据时一次性申请堆缓存大小, 
 | 
             * 一般不需要调用这个方法,因为IoProcessor 会自动调整缓冲的大小。 
 | 
             * 可以调用setMinReadBufferSize()、setMaxReadBufferSize()方法, 
 | 
             * 这样无论IoProcessor如何自动调整,都会在你指定的区间。 
 | 
             */ 
 | 
            //seConf.setReadBufferSize(1024); 
 | 
             
 | 
            //设置网络联接空闲时长 
 | 
            seConf.setIdleTime(IdleStatus.BOTH_IDLE, this.adapter.getConfig().idle); 
 | 
  
 | 
            //得到网络 通信数据过滤器链 
 | 
            DefaultIoFilterChainBuilder chain = acceptor.getFilterChain() ; 
 | 
            //生成编解码过滤器工厂类 
 | 
            dataCodecFactory = new DataCodecFactory(this.adapter) ; 
 | 
            //设置“protocol”,加入编解码过滤器,过滤器在IoProcessor线程中执行 
 | 
            chain.addLast("protocol", new ProtocolCodecFilter(dataCodecFactory)); 
 | 
             
 | 
            /* 
 | 
             * 一般ExecutorFilter 都要放在ProtocolCodecFilter过滤器的后面, 
 | 
             * 也就是让编解码运行在IoProcessor所在的线程,因为编解码处理的数据都是 
 | 
             * 由IoProcessor读取和发送的,没必要开启新的线程,否则性能反而会下降。 
 | 
             * ExecutorFilter过程器会启动一个线程池,处理后续代码逻辑。 
 | 
             * 一般使用ExecutorFilter的典型场景是将业务逻辑(譬如:耗时的数据库操作) 
 | 
             * 放在单独的线程中运行,也就是说与IO处理无关的操作可以考虑使用ExecutorFilter来异步执行。 
 | 
             * 本处用法,使ExecutorFilter线程池中的线程处理IOHandler(TcpIoHandler)操作 
 | 
             */ 
 | 
            chain.addLast("exceutor", new ExecutorFilter()); 
 | 
  
 | 
            //业务逻辑处理器,负责处理网络会话及输入输出数据 
 | 
            tcpIoHandler = new TcpIoHandler(this.adapter) ; 
 | 
            acceptor.setHandler(tcpIoHandler) ; 
 | 
  
 | 
            boolean isException = false ; 
 | 
            try { 
 | 
                acceptor.bind(new InetSocketAddress(this.adapter.getConfig().port)); 
 | 
            } catch (IOException e) { 
 | 
                e.printStackTrace(); 
 | 
                System.out.println("TCP通信模块启动失败!" + (e.getMessage()==null?"":e.getMessage())); 
 | 
                isException = true ; 
 | 
            }finally{ 
 | 
                ; 
 | 
            } 
 | 
  
 | 
            if(!isException){ 
 | 
                if(this.adapter.getConfig().showStartInfo != null && this.adapter.getConfig().showStartInfo.booleanValue()){ 
 | 
                    System.out.println("TCP模块成功启动,端口:" + this.adapter.getConfig().port); 
 | 
                } 
 | 
            } 
 | 
             
 | 
            callback.call(null); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 停止模块运行,将不再接入TCP网络连接,并把已经tcp连接的全部断连接 
 | 
     * @param callback 
 | 
     * @throws Exception 
 | 
     */ 
 | 
    @Override 
 | 
    public void stop(UnitCallbackInterface callback) throws Exception { 
 | 
        this.tcpIoHandler.stop(); 
 | 
        this.dataCodecFactory.stop(); 
 | 
        this.adapter.newUnitStopCallback().callback(); 
 | 
        callback.call(null); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 解除停止,恢复TCP服务运行 
 | 
     * @throws Exception 
 | 
     */ 
 | 
    public void recover() throws Exception { 
 | 
        this.tcpIoHandler.recover(); 
 | 
        this.dataCodecFactory.recover(); 
 | 
    } 
 | 
     
 | 
  
 | 
} 
 |