package com.dy.common.mw.protocol;
|
|
import java.util.Collection;
|
import java.util.HashMap;
|
|
import com.dy.common.mw.channel.tcp.TcpIoSessionAttrIdIsRtuAddr;
|
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.Logger;
|
import org.apache.mina.core.buffer.IoBuffer;
|
|
import com.dy.common.util.ByteUtil;
|
import com.dy.common.mw.channel.tcp.PrefixedDataAvailableHandle;
|
import com.dy.common.mw.channel.tcp.PrefixedDataAvailableStatus;
|
import org.apache.mina.core.session.IoSession;
|
|
public class PrefixedDataAvailableHandleImp implements PrefixedDataAvailableHandle {
|
|
private static final Logger log = LogManager.getLogger(PrefixedDataAvailableHandleImp.class) ;
|
|
|
/**
|
* 在多线程环境中运行
|
* 分析上线数据(网络连接后第一包数据)是否可获得
|
* @param in IoBuffer
|
* @return PrefixedDataAvailableStatus
|
*/
|
public PrefixedDataAvailableStatus forOnLine(IoSession ioSession, IoBuffer in){
|
int remain = in.remaining() ;
|
if(remain == 0){
|
return new PrefixedDataAvailableStatus().breaked() ;
|
}else{
|
try{
|
PrefixedDataAvailableStatus pds = null ;
|
PrefixedDataAvailable pda = null ;
|
|
//从会话缓存得到上次上行数据确定的协议
|
String protocolName = (String) ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName) ;
|
Short protocolVersion = (Short) ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion) ;
|
//由会话中的协议得到处理类
|
//利用缓存的协议,进而对不符合协议的上行数据也能用该协议处理了,例如RTU采用不同数据协议的嵌入式软件升级协议
|
//即RTU第一包数据确定下来协议,后面数据都用这个协议处理,那么数据可以变化了,可以不符合第一包数据遵守的协议了
|
Object[] objs = this.getClassObjAndAnnotationVo(protocolName, protocolVersion) ;
|
if(objs != null && objs[0] != null && objs[1] != null){
|
pda = (PrefixedDataAvailable)objs[0] ;
|
//处理完整性检查
|
pds = pda.forOnLine(ioSession, in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).onLineDataMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ;
|
//用完对象后,放回池中
|
PrefixedDataAvailablePool.freeInstance(objs[0].getClass(), pda);
|
}
|
if(pds == null){
|
//如果会话中缓存的协议未能正确处理
|
HashMap<String, AnnotationPrefixedDataAvailableVo> prefixedDataAvailableMap = ProtocolCache.getPrefixedDataAvailableMap() ;
|
Collection<AnnotationPrefixedDataAvailableVo> set = prefixedDataAvailableMap.values() ;
|
if(set.size() == 0){
|
throw new Exception("上线数据完整性检查时,得到的协议完整性检查类集合为空。") ;
|
}
|
int priority = ProtocolConstant.firstPriority ;
|
|
while(true){
|
objs = this.getClassObjAndAnnotationVo(priority, set) ;
|
pda = (PrefixedDataAvailable)objs[0] ;
|
if(pda == null && priority == ProtocolConstant.firstPriority){
|
throw new Exception("上线数据完整性检查时,未得到优先级为" + priority + "上线数据完整性检查类!") ;
|
}else if(pda == null){
|
//说明上线数据完整性检查类集合已经遍历完了。
|
break ;
|
}
|
//处理完整性检查
|
pds = pda.forOnLine(ioSession, in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).onLineDataMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ;
|
//用完对象后,放回池中
|
//2024-11-25下面一行进行了修改
|
//PrefixedDataAvailablePool.freeInstance(((AnnotationPrefixedDataAvailableVo)objs[1]).clazz, pda);
|
PrefixedDataAvailablePool.freeInstance(objs[0].getClass(), pda);
|
if(pds == null){
|
//说明不是对应的协议数据,需要另外的协议来处理上线
|
//循环继续
|
priority++ ;
|
}else{
|
//停止循环,返回结果
|
break ;
|
}
|
}
|
}
|
if(pds == null){
|
//说明数据不属于任何协议,一般为Rtu数据出错,或网络攻击数据
|
byte[] preByte = new byte[remain];
|
in.get(preByte) ;
|
in.position(0) ;
|
log.error("上线第一包数据未找到对应的解析协议。数据是:" + ByteUtil.bytes2Hex(preByte, true)) ;
|
//认为是完整包数据,后续读出,防止占用内存及死循环IO读数据
|
pds = new PrefixedDataAvailableStatus().rubbish(remain) ;
|
}
|
return pds ;
|
}catch(Exception e){
|
//处理过程中发生异常,上行数据就是垃圾数据了,后续以垃圾数据处理,即把数据从IO中取出来,以防止死循环IO读数据
|
return new PrefixedDataAvailableStatus().rubbish(remain) ;
|
}
|
}
|
}
|
|
/**
|
* 在多线程环境中运行
|
* 分析上行数据(网络连接后第二(包含)包以后数据)是否可获得
|
* @param in IoBuffer
|
* @return PrefixedDataAvailableStatus
|
*/
|
public PrefixedDataAvailableStatus forUpData(IoSession ioSession, IoBuffer in){
|
int remain = in.remaining() ;
|
if(remain == 0){
|
return new PrefixedDataAvailableStatus().breaked() ;
|
}else{
|
try{
|
PrefixedDataAvailableStatus pds = null ;
|
PrefixedDataAvailable pda = null ;
|
|
//从会话缓存得到上次上行数据确定的协议
|
String protocolName = (String) ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName) ;
|
Short protocolVersion = (Short) ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion) ;
|
//由会话中的协议得到处理类
|
Object[] objs = this.getClassObjAndAnnotationVo(protocolName, protocolVersion) ;
|
if(objs != null && objs[0] != null && objs[1] != null){
|
pda = (PrefixedDataAvailable)objs[0] ;
|
//处理完整性检查
|
pds = pda.forOnLine(ioSession, in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).onLineDataMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ;
|
//用完对象后,放回池中
|
PrefixedDataAvailablePool.freeInstance(objs[0].getClass(), pda);
|
}
|
if(pds == null) {
|
//如果会话中缓存的协议未能正确处理
|
HashMap<String, AnnotationPrefixedDataAvailableVo> prefixedDataAvailableMap = ProtocolCache.getPrefixedDataAvailableMap() ;
|
Collection<AnnotationPrefixedDataAvailableVo> set = prefixedDataAvailableMap.values() ;
|
if(set.size() == 0){
|
throw new Exception("上线数据完整性检查时,得到的协议集合为空。") ;
|
}
|
int priority = ProtocolConstant.firstPriority ;
|
|
while(true){
|
objs = this.getClassObjAndAnnotationVo(priority, set) ;
|
pda = (PrefixedDataAvailable)objs[0] ;
|
if(pda == null && priority == ProtocolConstant.firstPriority){
|
throw new Exception("上线数据完整性检查时,未得到优先级为" + priority + "上线数据完整性检查类!") ;
|
}else if(pda == null){
|
//说明上线数据完整性检查类集合已经遍历完了。
|
break ;
|
}
|
//处理完整性检查
|
pds = pda.forUpData(ioSession, in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).headMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ;
|
//用完对象后,放回池中
|
PrefixedDataAvailablePool.freeInstance(objs[0].getClass(), pda);
|
if(pds == null){
|
//说明不是对应的协议数据,需要另外的协议来处理上线
|
//循环继续
|
priority++ ;
|
}else{
|
//停止循环,返回结果
|
break ;
|
}
|
}
|
}
|
if(pds == null){
|
//说明数据不属于任何协议,一般为Rtu数据出错,或网络攻击数据
|
byte[] preByte = new byte[remain];
|
in.get(preByte) ;
|
in.position(0) ;
|
log.error("上行数据未找到对应的解析协议。数据是:" + ByteUtil.bytes2Hex(preByte, true)) ;
|
//认为是完整包数据,后续读出,防止占用内存及死循环IO读数据
|
pds = new PrefixedDataAvailableStatus().rubbish(remain) ;
|
}
|
return pds ;
|
}catch(Exception e){
|
//处理过程中发生异常,上行数据就是垃圾数据了,后续以垃圾数据处理,即把数据从IO中取出来,以防止死循环IO读数据
|
return new PrefixedDataAvailableStatus().rubbish(remain) ;
|
}
|
}
|
}
|
|
|
|
/**
|
* 得到处理类对象
|
* @param prority 优先级
|
* @param set 集合
|
* @return Object[]
|
* @throws Exception 异常
|
*/
|
private Object[] getClassObjAndAnnotationVo(int prority, Collection<AnnotationPrefixedDataAvailableVo> set) throws Exception{
|
PrefixedDataAvailable obj = null ;
|
AnnotationPrefixedDataAvailableVo rVo = null ;
|
for(AnnotationPrefixedDataAvailableVo vo : set){
|
if(prority == vo.priority){
|
obj = PrefixedDataAvailablePool.getInstance(vo.clazz) ;
|
rVo = vo ;
|
break ;
|
}
|
}
|
return new Object[]{obj, rVo} ;
|
}
|
/**
|
* 得到处理类对象
|
* @param protocolName 通信协议名称
|
* @param protocolVersion 通信协议版本号
|
* @return Object[]
|
* @throws Exception 异常
|
*/
|
private Object[] getClassObjAndAnnotationVo(String protocolName, Short protocolVersion) throws Exception{
|
PrefixedDataAvailable obj = null ;
|
AnnotationPrefixedDataAvailableVo rVo = null ;
|
if(protocolName != null && !protocolName.trim().equals("") && protocolVersion != null){
|
rVo = ProtocolCache.getAnnotationPrefixedDataAvailable(protocolName, protocolVersion) ;
|
obj = PrefixedDataAvailablePool.getInstance(rVo.clazz) ;
|
}
|
return new Object[]{obj, rVo} ;
|
}
|
|
}
|