package com.dy.common.mw.channel.tcp;
|
|
import com.dy.common.util.ByteUtil;
|
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.Logger;
|
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
|
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
|
import org.apache.mina.core.buffer.* ;
|
import org.apache.mina.core.session.IoSession;
|
|
|
public class DataDecoder extends CumulativeProtocolDecoder {
|
|
private static final Logger log = LogManager.getLogger(DataDecoder.class) ;
|
|
protected PrefixedDataAvailableHandle pdaHandle ;
|
|
public DataDecoder(PrefixedDataAvailableHandle pdaHandle) {
|
this.pdaHandle = pdaHandle ;
|
}
|
|
/**
|
* 解码
|
* (non-Javadoc)
|
* @see org.apache.mina.filter.codec.CumulativeProtocolDecoder#doDecode(
|
* org.apache.mina.core.session.IoSession,
|
* org.apache.mina.core.buffer.IoBuffer,
|
* org.apache.mina.filter.codec.ProtocolDecoderOutput)
|
*/
|
@Override
|
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) {
|
String rtuAddr = (String)session.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrRtuAddr) ;
|
if(rtuAddr == null){
|
//会话未被管理
|
//会话管理器中不存在此会话,说明刚建立网络连接,
|
return this.doDecode_onLineData(session, in, out) ;
|
}else{
|
//会话已被管理
|
//会话管理器中存在此会话,说明已经上线了,
|
return this.doDecode_data(session, in, out, rtuAddr) ;
|
}
|
}
|
|
/**
|
* 针对网络连接成功后第一包数据
|
* @param session IO会话
|
* @param in 输入Buffer
|
* @param out 协议输出编码
|
* @return 是否正好或粘包
|
*/
|
@SuppressWarnings("unused")
|
private boolean doDecode_onLineData(IoSession session, IoBuffer in, ProtocolDecoderOutput out){
|
//有一些协议,负责发上线数据,但上线(或心跳)数据中无 Rtu地址,所以这样的数据只能放行,而且不能产生有身份(Rtu地址)的网络会话,即不能把session放入会话缓存中
|
//有一些协议,Rtu负责发上线数据,上线数据中有 Rtu地址,所以这样的数据放行后,能产生有身份(Rtu地址)的网络会话,即能把session放入会话缓存中
|
//不论何种情形,上线数据的数据量不会很大,一般不会产生断包,所以这里只进行简单断包检查。
|
PrefixedDataAvailableStatus dataStatus = this.pdaHandle.forOnLine(session, in) ;
|
|
if(dataStatus.protocolName != null && dataStatus.protocolVersion != null){
|
session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName, dataStatus.protocolName) ;
|
session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion, dataStatus.protocolVersion) ;
|
}
|
|
if(dataStatus.isCompleted() || dataStatus.isAdjoined()){
|
//正好或粘包
|
this.nextDeal(in, dataStatus.getDataLen(), out) ;
|
return true;
|
}else if(dataStatus.isRubbish()){
|
//垃圾数据
|
this.dealRubbishData(in, dataStatus.getDataLen()) ;
|
return true ;
|
}else{
|
//断包
|
return false ;
|
}
|
}
|
|
|
/**
|
* 针对网络连接成功后非第一包数据
|
* @param session IO会话
|
* @param in 输入Buffer
|
* @param out 协议输出编码
|
* @param rtuAddr 控制器地址
|
* @return 是否正好或粘包
|
*/
|
@SuppressWarnings("unused")
|
private boolean doDecode_data(IoSession session, IoBuffer in, ProtocolDecoderOutput out, String rtuAddr) {
|
//非上线数据,可能会出现断包或粘包现象
|
PrefixedDataAvailableStatus dataStatus = this.pdaHandle.forUpData(session, in) ;
|
if(dataStatus == null){
|
//不可能发生
|
log.error("严重错误,Rtu (RTU" + rtuAddr + ")上行数据完整性检查时,返回的对象为空。") ;
|
this.nextDeal(in, null, out) ;
|
return true;
|
}else{
|
if(dataStatus.isBreaked()){
|
//断包了
|
return false ;
|
}else if(dataStatus.isCompleted() || dataStatus.isAdjoined()){
|
//本包数据已经全部接收,或可能粘有下包数据
|
this.nextDeal(in, dataStatus.getDataLen(), out) ;
|
if(dataStatus.isAdjoined()){
|
//说明粘包了,还有数据,需要对这些数据再次执行doDecode_方法.
|
return this.doDecode_data(session, in, out, rtuAddr) ;//加上递归
|
}else if(dataStatus.isCompleted()){
|
//数据不断不粘
|
return true;
|
}else{
|
//不存在此种情况
|
return true ;
|
}
|
}else if(dataStatus.isRubbish()){
|
//垃圾数据
|
this.dealRubbishData(in, dataStatus.getDataLen()) ;
|
return true ;
|
}else{
|
//不存在此种情况
|
return true ;
|
}
|
}
|
}
|
|
/**
|
* 后续处理数据
|
* @param in IObuffer
|
* @param length 长度
|
* @param out 协议编码输出
|
*/
|
private void nextDeal(IoBuffer in, Integer length, ProtocolDecoderOutput out){
|
if(length == null){
|
length = in.limit() ;
|
}
|
if(length > 0){
|
byte[] data = new byte[length];
|
in.get(data);//get一个字节,相应position向后移动一个字节
|
out.write(data);
|
}
|
}
|
|
/**
|
* 垃圾数据处理
|
* @param in IObuffer
|
* @param length 长度
|
*/
|
@SuppressWarnings("unused")
|
private void dealRubbishData(IoBuffer in, Integer length){
|
byte[] data = new byte[in.limit()];
|
in.get(data);//get一个字节,相应position向后移动一个字节
|
log.error("抛弃垃圾数据:" + ByteUtil.bytes2Hex(data, true));
|
}
|
}
|