package com.dy.common.mw.channel.tcp;
|
|
import java.io.IOException;
|
import java.net.InetSocketAddress;
|
|
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
|
import org.apache.mina.core.session.IdleStatus ;
|
import org.apache.mina.core.session.IoSession;
|
import org.apache.mina.transport.socket.SocketSessionConfig;
|
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder ;
|
import org.apache.mina.filter.codec.ProtocolCodecFilter;
|
import org.apache.mina.filter.executor.ExecutorFilter ;
|
|
import com.dy.common.mw.UnitAdapterInterface;
|
import com.dy.common.mw.UnitInterface;
|
import com.dy.common.mw.UnitCallbackInterface;
|
|
@SuppressWarnings("unused")
|
public class TcpUnit implements UnitInterface {
|
|
private static final TcpUnit instance = new TcpUnit() ;
|
private static boolean started = false ;
|
|
private TcpUnitAdapter adapter ;
|
private TcpIoHandler tcpIoHandler ;
|
private DataCodecFactory dataCodecFactory ;
|
|
private TcpUnit(){} ;
|
|
public static TcpUnit getInstance(){
|
return instance ;
|
}
|
|
/**
|
* 把IoSession会话的ID属性及协议名称版本号设置到IoSession属性中
|
* @param session
|
* @param rtuAddr
|
* @param protocolName
|
* @param protocolVersion
|
* @throws Exception
|
*/
|
public void setIoSessionArrs(IoSession session, String rtuAddr, String protocolName, Short protocolVersion) throws Exception {
|
session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr, rtuAddr) ;
|
session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName, protocolName) ;
|
session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion, protocolVersion) ;
|
}
|
|
@Override
|
public void setAdapter(UnitAdapterInterface adapter) throws Exception {
|
if(adapter == null){
|
throw new Exception("TCP模块适配器对象不能为空!") ;
|
}
|
this.adapter = (TcpUnitAdapter)adapter ;
|
TcpConfigVo vo = this.adapter.getConfig() ;
|
if(vo == null){
|
throw new Exception("TCP模块连接属性配置对象不能为空!") ;
|
}
|
if(vo.port == null || vo.idle == null || vo.processors == null){
|
throw new Exception("TCP模块连接属性配置对象属性值不能为空!") ;
|
}
|
if(this.adapter.newPrefixedDataAvailableHandle() == null){
|
throw new Exception("TCP模块上行数据解码类为空!") ;
|
}
|
if(this.adapter.newSessionEventCallback() == null){
|
throw new Exception("TCP模块事件处理回调类为空!") ;
|
}
|
}
|
/**
|
* 启动模块
|
*/
|
public void start(UnitCallbackInterface callback) throws Exception {
|
if(!started){
|
started = true ;
|
/**
|
* 异步非阻塞网络通信接收器,负责接收联接请求
|
* 同时设置IoProcessor个数
|
* 这个地方用于执行真正的IO 操作,默认启用的线程个数是CPU 的核数+1,譬如:单CPU 双
|
* 核的电脑,默认的IoProcessor 线程会创建3 个。这也就是说一个IoAcceptor 或者
|
* IoConnector 默认会关联一个IoProcessor 池,这个池中有3 个IoProcessor。因为IO 操作
|
* 耗费资源,所以这里使用IoProcessor 池来完成数据的读写操作,有助于提高性能。这也就
|
* 是前面说的IoAccetor、IoConnector 使用一个Selector,而IoProcessor 使用自己单独的
|
* Selector 的原因。
|
* 那么为什么IoProcessor 池中的IoProcessor 数量只比CPU 的核数大1 呢?因为IO 读写操
|
* 作是耗费CPU 的操作,而每一核CPU 同时只能运行一个线程,因此IoProcessor 池中的
|
* IoProcessor 的数量并不是越多越好。
|
*/
|
NioSocketAcceptor acceptor = new NioSocketAcceptor(this.adapter.getConfig().processors);
|
|
SocketSessionConfig seConf = acceptor.getSessionConfig() ;
|
|
/* 设置读数据时一次性申请堆缓存大小,
|
* 一般不需要调用这个方法,因为IoProcessor 会自动调整缓冲的大小。
|
* 可以调用setMinReadBufferSize()、setMaxReadBufferSize()方法,
|
* 这样无论IoProcessor如何自动调整,都会在你指定的区间。
|
*/
|
//seConf.setReadBufferSize(1024);
|
|
//设置网络联接空闲时长
|
seConf.setIdleTime(IdleStatus.BOTH_IDLE, this.adapter.getConfig().idle);
|
|
//得到网络 通信数据过滤器链
|
DefaultIoFilterChainBuilder chain = acceptor.getFilterChain() ;
|
//生成编解码过滤器工厂类
|
dataCodecFactory = new DataCodecFactory(this.adapter) ;
|
//设置“protocol”,加入编解码过滤器,过滤器在IoProcessor线程中执行
|
chain.addLast("protocol", new ProtocolCodecFilter(dataCodecFactory));
|
|
/*
|
* 一般ExecutorFilter 都要放在ProtocolCodecFilter过滤器的后面,
|
* 也就是让编解码运行在IoProcessor所在的线程,因为编解码处理的数据都是
|
* 由IoProcessor读取和发送的,没必要开启新的线程,否则性能反而会下降。
|
* ExecutorFilter过程器会启动一个线程池,处理后续代码逻辑。
|
* 一般使用ExecutorFilter的典型场景是将业务逻辑(譬如:耗时的数据库操作)
|
* 放在单独的线程中运行,也就是说与IO处理无关的操作可以考虑使用ExecutorFilter来异步执行。
|
* 本处用法,使ExecutorFilter线程池中的线程处理IOHandler(TcpIoHandler)操作
|
*/
|
chain.addLast("exceutor", new ExecutorFilter());
|
|
//业务逻辑处理器,负责处理网络会话及输入输出数据
|
tcpIoHandler = new TcpIoHandler(this.adapter) ;
|
acceptor.setHandler(tcpIoHandler) ;
|
|
boolean isException = false ;
|
try {
|
acceptor.bind(new InetSocketAddress(this.adapter.getConfig().port));
|
} catch (IOException e) {
|
e.printStackTrace();
|
System.out.println("TCP通信模块启动失败!" + (e.getMessage()==null?"":e.getMessage()));
|
isException = true ;
|
}finally{
|
;
|
}
|
|
if(!isException){
|
if(this.adapter.getConfig().showStartInfo != null && this.adapter.getConfig().showStartInfo.booleanValue()){
|
System.out.println("TCP模块成功启动,端口:" + this.adapter.getConfig().port);
|
}
|
}
|
|
callback.call(null);
|
}
|
}
|
|
/**
|
* 停止模块运行,将不再接入TCP网络连接,并把已经tcp连接的全部断连接
|
* @param callback
|
* @throws Exception
|
*/
|
@Override
|
public void stop(UnitCallbackInterface callback) throws Exception {
|
this.tcpIoHandler.stop();
|
this.dataCodecFactory.stop();
|
this.adapter.newUnitStopCallback().callback();
|
callback.call(null);
|
}
|
|
/**
|
* 解除停止,恢复TCP服务运行
|
* @throws Exception
|
*/
|
public void recover() throws Exception {
|
this.tcpIoHandler.recover();
|
this.dataCodecFactory.recover();
|
}
|
|
|
}
|