package com.dy.common.mw.channel.tcp; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import org.apache.mina.core.session.IdleStatus ; import org.apache.mina.core.session.IoSession; import org.apache.mina.transport.socket.SocketSessionConfig; import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder ; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.executor.ExecutorFilter ; import com.dy.common.mw.UnitAdapterInterface; import com.dy.common.mw.UnitInterface; import com.dy.common.mw.UnitCallbackInterface; @SuppressWarnings("unused") public class TcpUnit implements UnitInterface { private static final TcpUnit instance = new TcpUnit() ; private static boolean started = false ; private TcpUnitAdapter adapter ; private TcpIoHandler tcpIoHandler ; private DataCodecFactory dataCodecFactory ; private TcpUnit(){} ; public static TcpUnit getInstance(){ return instance ; } /** * 把IoSession会话的ID属性及协议名称版本号设置到IoSession属性中 * @param session * @param rtuAddr * @param protocolName * @param protocolVersion * @throws Exception */ public void setIoSessionArrs(IoSession session, String rtuAddr, String protocolName, Short protocolVersion) throws Exception { session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr, rtuAddr) ; session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName, protocolName) ; session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion, protocolVersion) ; } @Override public void setAdapter(UnitAdapterInterface adapter) throws Exception { if(adapter == null){ throw new Exception("TCP模块适配器对象不能为空!") ; } this.adapter = (TcpUnitAdapter)adapter ; TcpConfigVo vo = this.adapter.getConfig() ; if(vo == null){ throw new Exception("TCP模块连接属性配置对象不能为空!") ; } if(vo.port == null || vo.idle == null || vo.processors == null){ throw new Exception("TCP模块连接属性配置对象属性值不能为空!") ; } if(this.adapter.newPrefixedDataAvailableHandle() == null){ throw new Exception("TCP模块上行数据解码类为空!") ; } if(this.adapter.newSessionEventCallback() == null){ throw new Exception("TCP模块事件处理回调类为空!") ; } } /** * 启动模块 */ public void start(UnitCallbackInterface callback) throws Exception { if(!started){ started = true ; /** * 异步非阻塞网络通信接收器,负责接收联接请求 * 同时设置IoProcessor个数 * 这个地方用于执行真正的IO 操作,默认启用的线程个数是CPU 的核数+1,譬如:单CPU 双 * 核的电脑,默认的IoProcessor 线程会创建3 个。这也就是说一个IoAcceptor 或者 * IoConnector 默认会关联一个IoProcessor 池,这个池中有3 个IoProcessor。因为IO 操作 * 耗费资源,所以这里使用IoProcessor 池来完成数据的读写操作,有助于提高性能。这也就 * 是前面说的IoAccetor、IoConnector 使用一个Selector,而IoProcessor 使用自己单独的 * Selector 的原因。 * 那么为什么IoProcessor 池中的IoProcessor 数量只比CPU 的核数大1 呢?因为IO 读写操 * 作是耗费CPU 的操作,而每一核CPU 同时只能运行一个线程,因此IoProcessor 池中的 * IoProcessor 的数量并不是越多越好。 */ NioSocketAcceptor acceptor = new NioSocketAcceptor(this.adapter.getConfig().processors); SocketSessionConfig seConf = acceptor.getSessionConfig() ; /* 设置读数据时一次性申请堆缓存大小, * 一般不需要调用这个方法,因为IoProcessor 会自动调整缓冲的大小。 * 可以调用setMinReadBufferSize()、setMaxReadBufferSize()方法, * 这样无论IoProcessor如何自动调整,都会在你指定的区间。 */ //seConf.setReadBufferSize(1024); //设置网络联接空闲时长 seConf.setIdleTime(IdleStatus.BOTH_IDLE, this.adapter.getConfig().idle); //得到网络 通信数据过滤器链 DefaultIoFilterChainBuilder chain = acceptor.getFilterChain() ; //生成编解码过滤器工厂类 dataCodecFactory = new DataCodecFactory(this.adapter) ; //设置“protocol”,加入编解码过滤器,过滤器在IoProcessor线程中执行 chain.addLast("protocol", new ProtocolCodecFilter(dataCodecFactory)); /* * 一般ExecutorFilter 都要放在ProtocolCodecFilter过滤器的后面, * 也就是让编解码运行在IoProcessor所在的线程,因为编解码处理的数据都是 * 由IoProcessor读取和发送的,没必要开启新的线程,否则性能反而会下降。 * ExecutorFilter过程器会启动一个线程池,处理后续代码逻辑。 * 一般使用ExecutorFilter的典型场景是将业务逻辑(譬如:耗时的数据库操作) * 放在单独的线程中运行,也就是说与IO处理无关的操作可以考虑使用ExecutorFilter来异步执行。 * 本处用法,使ExecutorFilter线程池中的线程处理IOHandler(TcpIoHandler)操作 */ chain.addLast("exceutor", new ExecutorFilter()); //业务逻辑处理器,负责处理网络会话及输入输出数据 tcpIoHandler = new TcpIoHandler(this.adapter) ; acceptor.setHandler(tcpIoHandler) ; boolean isException = false ; try { acceptor.bind(new InetSocketAddress(this.adapter.getConfig().port)); } catch (IOException e) { e.printStackTrace(); System.out.println("TCP通信模块启动失败!" + (e.getMessage()==null?"":e.getMessage())); isException = true ; }finally{ ; } if(!isException){ if(this.adapter.getConfig().showStartInfo != null && this.adapter.getConfig().showStartInfo.booleanValue()){ System.out.println("TCP模块成功启动,端口:" + this.adapter.getConfig().port); } } callback.call(null); } } /** * 停止模块运行,将不再接入TCP网络连接,并把已经tcp连接的全部断连接 * @param callback * @throws Exception */ @Override public void stop(UnitCallbackInterface callback) throws Exception { this.tcpIoHandler.stop(); this.dataCodecFactory.stop(); this.adapter.newUnitStopCallback().callback(); callback.call(null); } /** * 解除停止,恢复TCP服务运行 * @throws Exception */ public void recover() throws Exception { this.tcpIoHandler.recover(); this.dataCodecFactory.recover(); } }