liurunyu
2024-07-11 7bf80c057f8e17fd37f1936e5b8dc1e1efad4bea
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package com.dy.common.mw.channel.tcp;
 
import java.io.IOException;
import java.net.InetSocketAddress;
 
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.apache.mina.core.session.IdleStatus ;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder ;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter ;
 
import com.dy.common.mw.UnitAdapterInterface;
import com.dy.common.mw.UnitInterface;
import com.dy.common.mw.UnitStartedCallbackInterface;
 
@SuppressWarnings("unused")
public class TcpUnit implements UnitInterface {
    
    private static final TcpUnit instance = new TcpUnit() ;
    private static boolean started = false ;
    
    private TcpUnitAdapter adapter ;
    
    private TcpUnit(){} ;
    
    public static TcpUnit getInstance(){
        return instance ;
    }
    
    /**
     * 把IoSession会话的ID属性设置到IoSession属性中
     * @throws Exception
     */
    public void setIoSessionArrId(IoSession session, String id) throws Exception {
        session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrIdKey, id) ;
    }
 
    
    @Override
    public void setAdapter(UnitAdapterInterface adapter) throws Exception {
        if(adapter == null){
            throw new Exception("TCP模块适配器对象不能为空!") ;
        }
        this.adapter = (TcpUnitAdapter)adapter ; 
        TcpConfigVo vo = this.adapter.getConfig() ;
        if(vo == null){
            throw new Exception("TCP模块连接属性配置对象不能为空!") ;
        }
        if(vo.port == null || vo.idle == null || vo.processors == null){
            throw new Exception("TCP模块连接属性配置对象属性值不能为空!") ;
        }
        if(this.adapter.newPrefixedDataAvailableHandle() == null){
            throw new Exception("TCP模块上行数据解码类为空!") ;
        }
        if(this.adapter.newSessionEventCallback() == null){
            throw new Exception("TCP模块事件处理回调类为空!") ;
        }
    }
    /**
     * 启动模块
     */
    public void start(UnitStartedCallbackInterface callback) throws Exception {
        if(!started){
            started = true ;
            /**
             * 异步非阻塞网络通信接收器,负责接收联接请求
             * 同时设置IoProcessor个数
             * 这个地方用于执行真正的IO 操作,默认启用的线程个数是CPU 的核数+1,譬如:单CPU 双
             * 核的电脑,默认的IoProcessor 线程会创建3 个。这也就是说一个IoAcceptor 或者
             * IoConnector 默认会关联一个IoProcessor 池,这个池中有3 个IoProcessor。因为IO 操作
             * 耗费资源,所以这里使用IoProcessor 池来完成数据的读写操作,有助于提高性能。这也就
             * 是前面说的IoAccetor、IoConnector 使用一个Selector,而IoProcessor 使用自己单独的
             * Selector 的原因。
             * 那么为什么IoProcessor 池中的IoProcessor 数量只比CPU 的核数大1 呢?因为IO 读写操
             * 作是耗费CPU 的操作,而每一核CPU 同时只能运行一个线程,因此IoProcessor 池中的
             * IoProcessor 的数量并不是越多越好。
             */
            NioSocketAcceptor acceptor = new NioSocketAcceptor(this.adapter.getConfig().processors);
            
            SocketSessionConfig seConf = acceptor.getSessionConfig() ;
            
            /* 设置读数据时一次性申请堆缓存大小,
             * 一般不需要调用这个方法,因为IoProcessor 会自动调整缓冲的大小。
             * 可以调用setMinReadBufferSize()、setMaxReadBufferSize()方法,
             * 这样无论IoProcessor如何自动调整,都会在你指定的区间。
             */
            //seConf.setReadBufferSize(1024);
            
            //设置网络联接空闲时长
            seConf.setIdleTime(IdleStatus.BOTH_IDLE, this.adapter.getConfig().idle);
 
            //得到网络 通信数据过滤器链
            DefaultIoFilterChainBuilder chain = acceptor.getFilterChain() ;
            //编解码过滤器
            chain.addLast("protocol", new ProtocolCodecFilter(new DataCodecFactory(this.adapter)));
            
            /*
             * 一般ExecutorFilter 都要放在ProtocolCodecFilter 过滤器的后面,
             * 也就是不要让编解码运行在独立的线程上,而是要运行在IoProcessor 所在的线程,
             * 因为编解码处理的数据都是由IoProcessor 读取和发送的,没必要开启新的线程,
             * 否则性能反而会下降。一般使用ExecutorFilter 的典型场景是将业务逻辑(譬如:耗时的数据库操作)
             * 放在单独的线程中运行,也就是说与IO 处理无关的操作可以考虑使用ExecutorFilter 来异步执行。
             */
            chain.addLast("exceutor", new ExecutorFilter());
 
            //业务逻辑处理器,负责处理网络会话及输入输出数据
            acceptor.setHandler(new TcpIoHandler(this.adapter));
 
            boolean isException = false ;
            try {
                acceptor.bind(new InetSocketAddress(this.adapter.getConfig().port));
            } catch (IOException e) {
                e.printStackTrace();
                System.out.println("TCP通信模块启动失败!" + (e.getMessage()==null?"":e.getMessage()));
                isException = true ;
            }finally{
                ;
            }
 
            if(!isException){
                if(this.adapter.getConfig().showStartInfo != null && this.adapter.getConfig().showStartInfo.booleanValue()){
                    System.out.println("TCP模块成功启动,端口:" + this.adapter.getConfig().port);
                }
            }
            
            callback.call(null);
        }
    }
 
    @Override
    public void stop(UnitStartedCallbackInterface callback) throws Exception {
    }
    
 
}