1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package com.dy.common.mw.channel.tcp;
 
import java.io.IOException;
import java.net.InetSocketAddress;
 
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.apache.mina.core.session.IdleStatus ;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder ;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter ;
 
import com.dy.common.mw.UnitAdapterInterface;
import com.dy.common.mw.UnitInterface;
import com.dy.common.mw.UnitCallbackInterface;
 
@SuppressWarnings("unused")
public class TcpUnit implements UnitInterface {
    
    private static final TcpUnit instance = new TcpUnit() ;
    private static boolean started = false ;
    
    private TcpUnitAdapter adapter ;
    private TcpIoHandler tcpIoHandler ;
    private DataCodecFactory dataCodecFactory ;
    
    private TcpUnit(){} ;
    
    public static TcpUnit getInstance(){
        return instance ;
    }
 
    /**
     * 把IoSession会话的ID属性及协议名称版本号设置到IoSession属性中
     * @param session
     * @param rtuAddr
     * @param protocolName
     * @param protocolVersion
     * @throws Exception
     */
    public void setIoSessionArrs(IoSession session, String rtuAddr, String protocolName, Short protocolVersion) throws Exception {
        session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr, rtuAddr) ;
        session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName, protocolName) ;
        session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion, protocolVersion) ;
    }
 
    @Override
    public void setAdapter(UnitAdapterInterface adapter) throws Exception {
        if(adapter == null){
            throw new Exception("TCP模块适配器对象不能为空!") ;
        }
        this.adapter = (TcpUnitAdapter)adapter ; 
        TcpConfigVo vo = this.adapter.getConfig() ;
        if(vo == null){
            throw new Exception("TCP模块连接属性配置对象不能为空!") ;
        }
        if(vo.port == null || vo.idle == null || vo.processors == null){
            throw new Exception("TCP模块连接属性配置对象属性值不能为空!") ;
        }
        if(this.adapter.newPrefixedDataAvailableHandle() == null){
            throw new Exception("TCP模块上行数据解码类为空!") ;
        }
        if(this.adapter.newSessionEventCallback() == null){
            throw new Exception("TCP模块事件处理回调类为空!") ;
        }
    }
    /**
     * 启动模块
     */
    public void start(UnitCallbackInterface callback) throws Exception {
        if(!started){
            started = true ;
            /**
             * 异步非阻塞网络通信接收器,负责接收联接请求
             * 同时设置IoProcessor个数
             * 这个地方用于执行真正的IO 操作,默认启用的线程个数是CPU 的核数+1,譬如:单CPU 双
             * 核的电脑,默认的IoProcessor 线程会创建3 个。这也就是说一个IoAcceptor 或者
             * IoConnector 默认会关联一个IoProcessor 池,这个池中有3 个IoProcessor。因为IO 操作
             * 耗费资源,所以这里使用IoProcessor 池来完成数据的读写操作,有助于提高性能。这也就
             * 是前面说的IoAccetor、IoConnector 使用一个Selector,而IoProcessor 使用自己单独的
             * Selector 的原因。
             * 那么为什么IoProcessor 池中的IoProcessor 数量只比CPU 的核数大1 呢?因为IO 读写操
             * 作是耗费CPU 的操作,而每一核CPU 同时只能运行一个线程,因此IoProcessor 池中的
             * IoProcessor 的数量并不是越多越好。
             */
            NioSocketAcceptor acceptor = new NioSocketAcceptor(this.adapter.getConfig().processors);
            
            SocketSessionConfig seConf = acceptor.getSessionConfig() ;
            
            /* 设置读数据时一次性申请堆缓存大小,
             * 一般不需要调用这个方法,因为IoProcessor 会自动调整缓冲的大小。
             * 可以调用setMinReadBufferSize()、setMaxReadBufferSize()方法,
             * 这样无论IoProcessor如何自动调整,都会在你指定的区间。
             */
            //seConf.setReadBufferSize(1024);
            
            //设置网络联接空闲时长
            seConf.setIdleTime(IdleStatus.BOTH_IDLE, this.adapter.getConfig().idle);
 
            //得到网络 通信数据过滤器链
            DefaultIoFilterChainBuilder chain = acceptor.getFilterChain() ;
            //生成编解码过滤器工厂类
            dataCodecFactory = new DataCodecFactory(this.adapter) ;
            //设置“protocol”,加入编解码过滤器,过滤器在IoProcessor线程中执行
            chain.addLast("protocol", new ProtocolCodecFilter(dataCodecFactory));
            
            /*
             * 一般ExecutorFilter 都要放在ProtocolCodecFilter过滤器的后面,
             * 也就是让编解码运行在IoProcessor所在的线程,因为编解码处理的数据都是
             * 由IoProcessor读取和发送的,没必要开启新的线程,否则性能反而会下降。
             * ExecutorFilter过程器会启动一个线程池,处理后续代码逻辑。
             * 一般使用ExecutorFilter的典型场景是将业务逻辑(譬如:耗时的数据库操作)
             * 放在单独的线程中运行,也就是说与IO处理无关的操作可以考虑使用ExecutorFilter来异步执行。
             * 本处用法,使ExecutorFilter线程池中的线程处理IOHandler(TcpIoHandler)操作
             */
            chain.addLast("exceutor", new ExecutorFilter());
 
            //业务逻辑处理器,负责处理网络会话及输入输出数据
            tcpIoHandler = new TcpIoHandler(this.adapter) ;
            acceptor.setHandler(tcpIoHandler) ;
 
            boolean isException = false ;
            try {
                acceptor.bind(new InetSocketAddress(this.adapter.getConfig().port));
            } catch (IOException e) {
                e.printStackTrace();
                System.out.println("TCP通信模块启动失败!" + (e.getMessage()==null?"":e.getMessage()));
                isException = true ;
            }finally{
                ;
            }
 
            if(!isException){
                if(this.adapter.getConfig().showStartInfo != null && this.adapter.getConfig().showStartInfo.booleanValue()){
                    System.out.println("TCP模块成功启动,端口:" + this.adapter.getConfig().port);
                }
            }
            
            callback.call(null);
        }
    }
 
    /**
     * 停止模块运行,将不再接入TCP网络连接,并把已经tcp连接的全部断连接
     * @param callback
     * @throws Exception
     */
    @Override
    public void stop(UnitCallbackInterface callback) throws Exception {
        this.tcpIoHandler.stop();
        this.dataCodecFactory.stop();
        this.adapter.newUnitStopCallback().callback();
        callback.call(null);
    }
 
    /**
     * 解除停止,恢复TCP服务运行
     * @throws Exception
     */
    public void recover() throws Exception {
        this.tcpIoHandler.recover();
        this.dataCodecFactory.recover();
    }
    
 
}