New file |
| | |
| | | package com.dy.common.mw.channel.tcp; |
| | | |
| | | import com.dy.common.util.ByteUtil; |
| | | import org.apache.logging.log4j.LogManager; |
| | | import org.apache.logging.log4j.Logger; |
| | | import org.apache.mina.filter.codec.CumulativeProtocolDecoder; |
| | | import org.apache.mina.filter.codec.ProtocolDecoderOutput; |
| | | import org.apache.mina.core.buffer.* ; |
| | | import org.apache.mina.core.session.IoSession; |
| | | |
| | | |
| | | public class DataDecoder extends CumulativeProtocolDecoder { |
| | | |
| | | private static final Logger log = LogManager.getLogger(DataDecoder.class) ; |
| | | |
| | | protected PrefixedDataAvailableHandle pdaHandle ; |
| | | |
| | | public DataDecoder(PrefixedDataAvailableHandle pdaHandle) { |
| | | this.pdaHandle = pdaHandle ; |
| | | } |
| | | |
| | | /** |
| | | * 解码 |
| | | * (non-Javadoc) |
| | | * @see org.apache.mina.filter.codec.CumulativeProtocolDecoder#doDecode( |
| | | * org.apache.mina.core.session.IoSession, |
| | | * org.apache.mina.core.buffer.IoBuffer, |
| | | * org.apache.mina.filter.codec.ProtocolDecoderOutput) |
| | | */ |
| | | @Override |
| | | protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) { |
| | | String rtuAddr = (String)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr) ; |
| | | if(rtuAddr == null){ |
| | | //会话未被管理 |
| | | //会话管理器中不存在此会话,说明刚建立网络连接, |
| | | return this.doDecode_onLineData(session, in, out) ; |
| | | }else{ |
| | | //会话已被管理 |
| | | //会话管理器中存在此会话,说明已经上线了, |
| | | return this.doDecode_data(session, in, out, rtuAddr) ; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 针对网络连接成功后第一包数据 |
| | | * @param session IO会话 |
| | | * @param in 输入Buffer |
| | | * @param out 协议输出编码 |
| | | * @return 是否正好或粘包 |
| | | */ |
| | | @SuppressWarnings("unused") |
| | | private boolean doDecode_onLineData(IoSession session, IoBuffer in, ProtocolDecoderOutput out){ |
| | | //有一些协议,负责发上线数据,但上线(或心跳)数据中无 Rtu地址,所以这样的数据只能放行,而且不能产生有身份(Rtu地址)的网络会话,即不能把session放入会话缓存中 |
| | | //有一些协议,Rtu负责发上线数据,上线数据中有 Rtu地址,所以这样的数据放行后,能产生有身份(Rtu地址)的网络会话,即能把session放入会话缓存中 |
| | | //不论何种情形,上线数据的数据量不会很大,一般不会产生断包,所以这里只进行简单断包检查。 |
| | | PrefixedDataAvailableStatus dataStatus = this.pdaHandle.forOnLine(session, in) ; |
| | | |
| | | if(dataStatus.protocolName != null && dataStatus.protocolVersion != null){ |
| | | session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName, dataStatus.protocolName) ; |
| | | session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion, dataStatus.protocolVersion) ; |
| | | } |
| | | |
| | | if(dataStatus.isCompleted() || dataStatus.isAdjoined()){ |
| | | //正好或粘包 |
| | | this.nextDeal(in, dataStatus.getDataLen(), out) ; |
| | | return true; |
| | | }else if(dataStatus.isRubbish()){ |
| | | //垃圾数据 |
| | | this.dealRubbishData(in, dataStatus.getDataLen()) ; |
| | | return true ; |
| | | }else{ |
| | | //断包 |
| | | return false ; |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 针对网络连接成功后非第一包数据 |
| | | * @param session IO会话 |
| | | * @param in 输入Buffer |
| | | * @param out 协议输出编码 |
| | | * @param rtuAddr 控制器地址 |
| | | * @return 是否正好或粘包 |
| | | */ |
| | | @SuppressWarnings("unused") |
| | | private boolean doDecode_data(IoSession session, IoBuffer in, ProtocolDecoderOutput out, String rtuAddr) { |
| | | //非上线数据,可能会出现断包或粘包现象 |
| | | PrefixedDataAvailableStatus dataStatus = this.pdaHandle.forUpData(session, in) ; |
| | | if(dataStatus == null){ |
| | | //不可能发生 |
| | | log.error("严重错误,Rtu (RTU" + rtuAddr + ")上行数据完整性检查时,返回的对象为空。") ; |
| | | this.nextDeal(in, null, out) ; |
| | | return true; |
| | | }else{ |
| | | if(dataStatus.isBreaked()){ |
| | | //断包了 |
| | | return false ; |
| | | }else if(dataStatus.isCompleted() || dataStatus.isAdjoined()){ |
| | | //本包数据已经全部接收,或可能粘有下包数据 |
| | | this.nextDeal(in, dataStatus.getDataLen(), out) ; |
| | | if(dataStatus.isAdjoined()){ |
| | | //说明粘包了,还有数据,需要对这些数据再次执行doDecode_方法. |
| | | return this.doDecode_data(session, in, out, rtuAddr) ;//加上递归 |
| | | }else if(dataStatus.isCompleted()){ |
| | | //数据不断不粘 |
| | | return true; |
| | | }else{ |
| | | //不存在此种情况 |
| | | return true ; |
| | | } |
| | | }else if(dataStatus.isRubbish()){ |
| | | //垃圾数据 |
| | | this.dealRubbishData(in, dataStatus.getDataLen()) ; |
| | | return true ; |
| | | }else{ |
| | | //不存在此种情况 |
| | | return true ; |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 后续处理数据 |
| | | * @param in IObuffer |
| | | * @param length 长度 |
| | | * @param out 协议编码输出 |
| | | */ |
| | | private void nextDeal(IoBuffer in, Integer length, ProtocolDecoderOutput out){ |
| | | if(length == null){ |
| | | length = in.limit() ; |
| | | } |
| | | if(length > 0){ |
| | | byte[] data = new byte[length]; |
| | | in.get(data);//get一个字节,相应position向后移动一个字节 |
| | | out.write(data); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 垃圾数据处理 |
| | | * @param in IObuffer |
| | | * @param length 长度 |
| | | */ |
| | | @SuppressWarnings("unused") |
| | | private void dealRubbishData(IoBuffer in, Integer length){ |
| | | byte[] data = new byte[in.limit()]; |
| | | in.get(data);//get一个字节,相应position向后移动一个字节 |
| | | log.error("抛弃垃圾数据:" + ByteUtil.bytes2Hex(data, true)); |
| | | } |
| | | } |