package com.dy.common.mw.channel.tcp; import com.dy.common.util.ByteUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.core.buffer.* ; import org.apache.mina.core.session.IoSession; public class DataDecoder extends CumulativeProtocolDecoder { private static final Logger log = LogManager.getLogger(DataDecoder.class) ; protected PrefixedDataAvailableHandle pdaHandle ; public DataDecoder(PrefixedDataAvailableHandle pdaHandle) { this.pdaHandle = pdaHandle ; } /** * 解码 * (non-Javadoc) * @see org.apache.mina.filter.codec.CumulativeProtocolDecoder#doDecode( * org.apache.mina.core.session.IoSession, * org.apache.mina.core.buffer.IoBuffer, * org.apache.mina.filter.codec.ProtocolDecoderOutput) */ protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) { String meterNo = (String)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrIdKey) ; if(meterNo == null){ //会话未被管理 //会话管理器中不存在此会话,说明刚建立网络连接, return this.doDecode_onLineData(session, in, out) ; }else{ //会话已被管理 //会话管理器中存在此会话,说明已经上线了, return this.doDecode_data(session, in, out, meterNo) ; } } /** * 针对网络连接成功后第一包数据 * @param session IO会话 * @param in 输入Buffer * @param out 协议输出编码 * @return 是否正好或粘包 */ @SuppressWarnings("unused") private boolean doDecode_onLineData(IoSession session, IoBuffer in, ProtocolDecoderOutput out){ //有一些协议,负责发上线数据,但上线(或心跳)数据中无 Rtu地址,所以这样的数据只能放行,而且不能产生有身份(Rtu地址)的网络会话,即不能把session放入会话缓存中 //有一些协议,Rtu负责发上线数据,上线数据中有 Rtu地址,所以这样的数据放行后,能产生有身份(Rtu地址)的网络会话,即能把session放入会话缓存中 //不论何种情形,上线数据的数据量不会很大,一般不会产生断包,所以这里只进行简单断包检查。 PrefixedDataAvailableStatus dataStatus = this.pdaHandle.forOnLine(in) ; if(dataStatus.isCompleted() || dataStatus.isAdjoined()){ //正好或粘包 this.nextDeal(in, dataStatus.getDataLen(), out) ; return true; }else if(dataStatus.isRubbish()){ //垃圾数据 this.dealRubbishData(in, dataStatus.getDataLen()) ; return true ; }else{ //断包 return false ; } } /** * 针对网络连接成功后非第一包数据 * @param session IO会话 * @param in 输入Buffer * @param out 协议输出编码 * @param meterNo 控制器编号 * @return 是否正好或粘包 */ @SuppressWarnings("unused") private boolean doDecode_data(IoSession session, IoBuffer in, ProtocolDecoderOutput out, String meterNo) { //非上线数据,可能会出现断包或粘包现象 PrefixedDataAvailableStatus dataStatus = this.pdaHandle.forUpData(in) ; if(dataStatus == null){ //不可能发生 log.error("严重错误,Rtu (水表号为" + meterNo + ")上行数据完整性检查时,返回的对象为空。") ; this.nextDeal(in, null, out) ; return true; }else{ if(dataStatus.isBreaked()){ //断包了 return false ; }else if(dataStatus.isCompleted() || dataStatus.isAdjoined()){ //本包数据已经全部接收,并且可能粘有下包数据 this.nextDeal(in, dataStatus.getDataLen(), out) ; if(dataStatus.isAdjoined()){ //说明粘包了,还有数据,需要对这些数据再次执行doDecode_方法. return this.doDecode_data(session, in, out, meterNo) ;//加上递归 }else if(dataStatus.isCompleted()){ //数据不断不粘 return true; }else{ //不存在此种情况 return true ; } }else if(dataStatus.isRubbish()){ //垃圾数据 this.dealRubbishData(in, dataStatus.getDataLen()) ; return true ; }else{ //不存在此种情况 return true ; } } } /** * 后续处理数据 * @param in IObuffer * @param length 长度 * @param out 协议编码输出 */ private void nextDeal(IoBuffer in, Integer length, ProtocolDecoderOutput out){ if(length == null){ length = in.limit() ; } if(length > 0){ byte[] data = new byte[length]; in.get(data);//get一个字节,相应position向后移动一个字节 out.write(data); } } /** * 垃圾数据处理 * @param in IObuffer * @param length 长度 */ @SuppressWarnings("unused") private void dealRubbishData(IoBuffer in, Integer length){ byte[] data = new byte[in.limit()]; in.get(data);//get一个字节,相应position向后移动一个字节 log.error("抛弃垃圾数据:" + ByteUtil.bytes2Hex(data, true)); } }