New file |
| | |
| | | package com.dy.common.mw.channel.tcp; |
| | | |
| | | import java.io.IOException; |
| | | import java.net.InetSocketAddress; |
| | | |
| | | import org.apache.mina.transport.socket.nio.NioSocketAcceptor; |
| | | import org.apache.mina.core.session.IdleStatus ; |
| | | import org.apache.mina.core.session.IoSession; |
| | | import org.apache.mina.transport.socket.SocketSessionConfig; |
| | | import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder ; |
| | | import org.apache.mina.filter.codec.ProtocolCodecFilter; |
| | | import org.apache.mina.filter.executor.ExecutorFilter ; |
| | | |
| | | import com.dy.common.mw.UnitAdapterInterface; |
| | | import com.dy.common.mw.UnitInterface; |
| | | import com.dy.common.mw.UnitCallbackInterface; |
| | | |
| | | @SuppressWarnings("unused") |
| | | public class TcpUnit implements UnitInterface { |
| | | |
| | | private static final TcpUnit instance = new TcpUnit() ; |
| | | private static boolean started = false ; |
| | | |
| | | private TcpUnitAdapter adapter ; |
| | | private TcpIoHandler tcpIoHandler ; |
| | | private DataCodecFactory dataCodecFactory ; |
| | | |
| | | private TcpUnit(){} ; |
| | | |
| | | public static TcpUnit getInstance(){ |
| | | return instance ; |
| | | } |
| | | |
| | | /** |
| | | * 把IoSession会话的ID属性及协议名称版本号设置到IoSession属性中 |
| | | * @param session |
| | | * @param rtuAddr |
| | | * @param protocolName |
| | | * @param protocolVersion |
| | | * @throws Exception |
| | | */ |
| | | public void setIoSessionArrs(IoSession session, String rtuAddr, String protocolName, Short protocolVersion) throws Exception { |
| | | session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr, rtuAddr) ; |
| | | session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName, protocolName) ; |
| | | session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion, protocolVersion) ; |
| | | } |
| | | |
| | | @Override |
| | | public void setAdapter(UnitAdapterInterface adapter) throws Exception { |
| | | if(adapter == null){ |
| | | throw new Exception("TCP模块适配器对象不能为空!") ; |
| | | } |
| | | this.adapter = (TcpUnitAdapter)adapter ; |
| | | TcpConfigVo vo = this.adapter.getConfig() ; |
| | | if(vo == null){ |
| | | throw new Exception("TCP模块连接属性配置对象不能为空!") ; |
| | | } |
| | | if(vo.port == null || vo.idle == null || vo.processors == null){ |
| | | throw new Exception("TCP模块连接属性配置对象属性值不能为空!") ; |
| | | } |
| | | if(this.adapter.newPrefixedDataAvailableHandle() == null){ |
| | | throw new Exception("TCP模块上行数据解码类为空!") ; |
| | | } |
| | | if(this.adapter.newSessionEventCallback() == null){ |
| | | throw new Exception("TCP模块事件处理回调类为空!") ; |
| | | } |
| | | } |
| | | /** |
| | | * 启动模块 |
| | | */ |
| | | public void start(UnitCallbackInterface callback) throws Exception { |
| | | if(!started){ |
| | | started = true ; |
| | | /** |
| | | * 异步非阻塞网络通信接收器,负责接收联接请求 |
| | | * 同时设置IoProcessor个数 |
| | | * 这个地方用于执行真正的IO 操作,默认启用的线程个数是CPU 的核数+1,譬如:单CPU 双 |
| | | * 核的电脑,默认的IoProcessor 线程会创建3 个。这也就是说一个IoAcceptor 或者 |
| | | * IoConnector 默认会关联一个IoProcessor 池,这个池中有3 个IoProcessor。因为IO 操作 |
| | | * 耗费资源,所以这里使用IoProcessor 池来完成数据的读写操作,有助于提高性能。这也就 |
| | | * 是前面说的IoAccetor、IoConnector 使用一个Selector,而IoProcessor 使用自己单独的 |
| | | * Selector 的原因。 |
| | | * 那么为什么IoProcessor 池中的IoProcessor 数量只比CPU 的核数大1 呢?因为IO 读写操 |
| | | * 作是耗费CPU 的操作,而每一核CPU 同时只能运行一个线程,因此IoProcessor 池中的 |
| | | * IoProcessor 的数量并不是越多越好。 |
| | | */ |
| | | NioSocketAcceptor acceptor = new NioSocketAcceptor(this.adapter.getConfig().processors); |
| | | |
| | | SocketSessionConfig seConf = acceptor.getSessionConfig() ; |
| | | |
| | | /* 设置读数据时一次性申请堆缓存大小, |
| | | * 一般不需要调用这个方法,因为IoProcessor 会自动调整缓冲的大小。 |
| | | * 可以调用setMinReadBufferSize()、setMaxReadBufferSize()方法, |
| | | * 这样无论IoProcessor如何自动调整,都会在你指定的区间。 |
| | | */ |
| | | //seConf.setReadBufferSize(1024); |
| | | |
| | | //设置网络联接空闲时长 |
| | | seConf.setIdleTime(IdleStatus.BOTH_IDLE, this.adapter.getConfig().idle); |
| | | |
| | | //得到网络 通信数据过滤器链 |
| | | DefaultIoFilterChainBuilder chain = acceptor.getFilterChain() ; |
| | | //生成编解码过滤器工厂类 |
| | | dataCodecFactory = new DataCodecFactory(this.adapter) ; |
| | | //设置“protocol”,加入编解码过滤器,过滤器在IoProcessor线程中执行 |
| | | chain.addLast("protocol", new ProtocolCodecFilter(dataCodecFactory)); |
| | | |
| | | /* |
| | | * 一般ExecutorFilter 都要放在ProtocolCodecFilter过滤器的后面, |
| | | * 也就是让编解码运行在IoProcessor所在的线程,因为编解码处理的数据都是 |
| | | * 由IoProcessor读取和发送的,没必要开启新的线程,否则性能反而会下降。 |
| | | * ExecutorFilter过程器会启动一个线程池,处理后续代码逻辑。 |
| | | * 一般使用ExecutorFilter的典型场景是将业务逻辑(譬如:耗时的数据库操作) |
| | | * 放在单独的线程中运行,也就是说与IO处理无关的操作可以考虑使用ExecutorFilter来异步执行。 |
| | | * 本处用法,使ExecutorFilter线程池中的线程处理IOHandler(TcpIoHandler)操作 |
| | | */ |
| | | chain.addLast("exceutor", new ExecutorFilter()); |
| | | |
| | | //业务逻辑处理器,负责处理网络会话及输入输出数据 |
| | | tcpIoHandler = new TcpIoHandler(this.adapter) ; |
| | | acceptor.setHandler(tcpIoHandler) ; |
| | | |
| | | boolean isException = false ; |
| | | try { |
| | | acceptor.bind(new InetSocketAddress(this.adapter.getConfig().port)); |
| | | } catch (IOException e) { |
| | | e.printStackTrace(); |
| | | System.out.println("TCP通信模块启动失败!" + (e.getMessage()==null?"":e.getMessage())); |
| | | isException = true ; |
| | | }finally{ |
| | | ; |
| | | } |
| | | |
| | | if(!isException){ |
| | | if(this.adapter.getConfig().showStartInfo != null && this.adapter.getConfig().showStartInfo.booleanValue()){ |
| | | System.out.println("TCP模块成功启动,端口:" + this.adapter.getConfig().port); |
| | | } |
| | | } |
| | | |
| | | callback.call(null); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 停止模块运行,将不再接入TCP网络连接,并把已经tcp连接的全部断连接 |
| | | * @param callback |
| | | * @throws Exception |
| | | */ |
| | | @Override |
| | | public void stop(UnitCallbackInterface callback) throws Exception { |
| | | this.tcpIoHandler.stop(); |
| | | this.dataCodecFactory.stop(); |
| | | this.adapter.newUnitStopCallback().callback(); |
| | | callback.call(null); |
| | | } |
| | | |
| | | /** |
| | | * 解除停止,恢复TCP服务运行 |
| | | * @throws Exception |
| | | */ |
| | | public void recover() throws Exception { |
| | | this.tcpIoHandler.recover(); |
| | | this.dataCodecFactory.recover(); |
| | | } |
| | | |
| | | |
| | | } |