zhubaomin
2025-04-14 b3b17b231e2f2840332ce6eb96f791865fdec6d5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package com.dy.common.mw.channel.tcp;
 
import com.dy.common.util.ByteUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.core.buffer.* ;
import org.apache.mina.core.session.IoSession;
 
 
public class DataDecoder extends CumulativeProtocolDecoder {
 
    private static final Logger log = LogManager.getLogger(DataDecoder.class) ;
 
    protected PrefixedDataAvailableHandle pdaHandle ;
    
    public DataDecoder(PrefixedDataAvailableHandle pdaHandle) {
        this.pdaHandle = pdaHandle ;
    }
 
    /**
     * 解码
     * (non-Javadoc)
     * @see org.apache.mina.filter.codec.CumulativeProtocolDecoder#doDecode(
     *     org.apache.mina.core.session.IoSession, 
     *     org.apache.mina.core.buffer.IoBuffer, 
     *     org.apache.mina.filter.codec.ProtocolDecoderOutput)
     */
    @Override
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) {
        String rtuAddr = (String)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr) ;
        if(rtuAddr == null){
            //会话未被管理
            //会话管理器中不存在此会话,说明刚建立网络连接,
            return this.doDecode_onLineData(session, in, out) ;
        }else{
            //会话已被管理
            //会话管理器中存在此会话,说明已经上线了,
            return this.doDecode_data(session, in, out, rtuAddr) ;
        }
    }
    
    /**
     * 针对网络连接成功后第一包数据
     * @param session IO会话
     * @param in 输入Buffer
     * @param out 协议输出编码
     * @return 是否正好或粘包
     */
    @SuppressWarnings("unused")
    private boolean doDecode_onLineData(IoSession session, IoBuffer in, ProtocolDecoderOutput out){
        //有一些协议,负责发上线数据,但上线(或心跳)数据中无 Rtu地址,所以这样的数据只能放行,而且不能产生有身份(Rtu地址)的网络会话,即不能把session放入会话缓存中
        //有一些协议,Rtu负责发上线数据,上线数据中有 Rtu地址,所以这样的数据放行后,能产生有身份(Rtu地址)的网络会话,即能把session放入会话缓存中
        //不论何种情形,上线数据的数据量不会很大,一般不会产生断包,所以这里只进行简单断包检查。
        PrefixedDataAvailableStatus dataStatus = this.pdaHandle.forOnLine(session, in) ;
 
        if(dataStatus.protocolName != null && dataStatus.protocolVersion != null){
            session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName, dataStatus.protocolName) ;
            session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion, dataStatus.protocolVersion) ;
        }
 
        if(dataStatus.isCompleted() || dataStatus.isAdjoined()){
            //正好或粘包
            this.nextDeal(in, dataStatus.getDataLen(), out) ;
            return true;
        }else if(dataStatus.isRubbish()){
            //垃圾数据
            this.dealRubbishData(in, dataStatus.getDataLen()) ;
            return true ;
        }else{
            //断包
            return false ;
        }
    }
    
 
    /**
     * 针对网络连接成功后非第一包数据
     * @param session IO会话
     * @param in 输入Buffer
     * @param out 协议输出编码
     * @param rtuAddr 控制器地址
     * @return 是否正好或粘包
     */
    @SuppressWarnings("unused")
    private boolean doDecode_data(IoSession session, IoBuffer in, ProtocolDecoderOutput out, String rtuAddr) {
        //非上线数据,可能会出现断包或粘包现象
        PrefixedDataAvailableStatus dataStatus = this.pdaHandle.forUpData(session, in) ;
        if(dataStatus == null){
            //不可能发生
            log.error("严重错误,Rtu (RTU" + rtuAddr + ")上行数据完整性检查时,返回的对象为空。") ;
            this.nextDeal(in, null, out) ;
            return true;
        }else{
            if(dataStatus.isBreaked()){
                //断包了
                return false ;
            }else if(dataStatus.isCompleted() || dataStatus.isAdjoined()){
                //本包数据已经全部接收,或可能粘有下包数据
                this.nextDeal(in, dataStatus.getDataLen(), out) ;
                if(dataStatus.isAdjoined()){
                    //说明粘包了,还有数据,需要对这些数据再次执行doDecode_方法.
                    return this.doDecode_data(session, in, out, rtuAddr) ;//加上递归
                }else if(dataStatus.isCompleted()){
                    //数据不断不粘
                    return true;
                }else{
                    //不存在此种情况
                    return true ;
                }
            }else if(dataStatus.isRubbish()){
                //垃圾数据
                this.dealRubbishData(in, dataStatus.getDataLen()) ;
                return true ;
            }else{
                //不存在此种情况
                return true ;
            }
        }
    }    
    
    /**
     * 后续处理数据
     * @param in IObuffer
     * @param length 长度
     * @param out 协议编码输出
     */
    private void nextDeal(IoBuffer in, Integer length, ProtocolDecoderOutput out){
        if(length == null){
            length = in.limit() ;
        }
        if(length > 0){
            byte[] data = new byte[length];
            in.get(data);//get一个字节,相应position向后移动一个字节
            out.write(data);
        }
    }
    
    /**
     * 垃圾数据处理
     * @param in IObuffer
     * @param length 长度
     */
    @SuppressWarnings("unused")
    private void dealRubbishData(IoBuffer in, Integer length){
        byte[] data = new byte[in.limit()];
        in.get(data);//get一个字节,相应position向后移动一个字节
        log.error("抛弃垃圾数据:" + ByteUtil.bytes2Hex(data, true));
    }
}