From 1135c89deb50a080152f9086fc7b741c415ecd54 Mon Sep 17 00:00:00 2001 From: liurunyu <lry9898@163.com> Date: 星期三, 12 二月 2025 17:00:14 +0800 Subject: [PATCH] 通信中间件增加功能: 1、实现消息心中; 2、开阀报、关阀报、报警数据都会在消息中间件存入消息; 3、在消息中心注册消息接收者,消息中心周期性向消息接收者推送消息。 --- pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/PrefixedDataAvailableHandleImp.java | 176 ++++++++++++++++++++++++++++++++++++++-------------------- 1 files changed, 115 insertions(+), 61 deletions(-) diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/PrefixedDataAvailableHandleImp.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/PrefixedDataAvailableHandleImp.java index 56d32fc..3b99830 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/PrefixedDataAvailableHandleImp.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/PrefixedDataAvailableHandleImp.java @@ -3,6 +3,7 @@ import java.util.Collection; import java.util.HashMap; +import com.dy.common.mw.channel.tcp.TcpIoSessionAttrIdIsRtuAddr; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.mina.core.buffer.IoBuffer; @@ -10,6 +11,7 @@ import com.dy.common.util.ByteUtil; import com.dy.common.mw.channel.tcp.PrefixedDataAvailableHandle; import com.dy.common.mw.channel.tcp.PrefixedDataAvailableStatus; +import org.apache.mina.core.session.IoSession; public class PrefixedDataAvailableHandleImp implements PrefixedDataAvailableHandle { @@ -22,43 +24,63 @@ * @param in IoBuffer * @return PrefixedDataAvailableStatus */ - public PrefixedDataAvailableStatus forOnLine(IoBuffer in){ + public PrefixedDataAvailableStatus forOnLine(IoSession ioSession, IoBuffer in){ int remain = in.remaining() ; if(remain == 0){ return new PrefixedDataAvailableStatus().breaked() ; }else{ try{ - PrefixedDataAvailableStatus pds = null ; - - HashMap<String, AnnotationPrefixedDataAvailableVo> prefixedDataAvailableMap = ProtocolCach.getPrefixedDataAvailableMap() ; - Collection<AnnotationPrefixedDataAvailableVo> set = prefixedDataAvailableMap.values() ; - if(set.size() == 0){ - throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屽緱鍒扮殑鍗忚瀹屾暣鎬ф鏌ョ被闆嗗悎涓虹┖銆�") ; - } - int prority = ProtocolConstant.firstPriority ; + PrefixedDataAvailableStatus pds = null ; + PrefixedDataAvailable pda = null ; - while(true){ - Object[] objs = this.getClassObjAndAnnotationVo(prority, set) ; - PrefixedDataAvailable pda = (PrefixedDataAvailable)objs[0] ; - if(pda == null && prority == ProtocolConstant.firstPriority){ - throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屾湭寰楀埌浼樺厛绾т负" + prority + "涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被锛�") ; - }else if(pda == null){ - //璇存槑涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被闆嗗悎宸茬粡閬嶅巻瀹屼簡銆� - break ; - } - //澶勭悊瀹屾暣鎬ф鏌� - pds = pda.forOnLine(in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).onLineDataMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ; - //鐢ㄥ畬瀵硅薄鍚庯紝鏀惧洖姹犱腑 - PrefixedDataAvailablePool.freeInstance(((AnnotationPrefixedDataAvailableVo)objs[1]).clazz, pda); - if(pds == null){ - //璇存槑涓嶆槸瀵瑰簲鐨勫崗璁暟鎹紝闇�瑕佸彟澶栫殑鍗忚鏉ュ鐞嗕笂绾� - //寰幆缁х画 - prority++ ; - }else{ - //鍋滄寰幆锛岃繑鍥炵粨鏋� - break ; + //浠庝細璇濈紦瀛樺緱鍒颁笂娆′笂琛屾暟鎹‘瀹氱殑鍗忚 + String protocolName = (String) ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName) ; + Short protocolVersion = (Short) ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion) ; + //鐢变細璇濅腑鐨勫崗璁緱鍒板鐞嗙被 + //鍒╃敤缂撳瓨鐨勫崗璁紝杩涜�屽涓嶇鍚堝崗璁殑涓婅鏁版嵁涔熻兘鐢ㄨ鍗忚澶勭悊浜嗭紝渚嬪RTU閲囩敤涓嶅悓鏁版嵁鍗忚鐨勫祵鍏ュ紡杞欢鍗囩骇鍗忚 + //鍗砇TU绗竴鍖呮暟鎹‘瀹氫笅鏉ュ崗璁紝鍚庨潰鏁版嵁閮界敤杩欎釜鍗忚澶勭悊锛岄偅涔堟暟鎹彲浠ュ彉鍖栦簡锛屽彲浠ヤ笉绗﹀悎绗竴鍖呮暟鎹伒瀹堢殑鍗忚浜� + Object[] objs = this.getClassObjAndAnnotationVo(protocolName, protocolVersion) ; + if(objs != null && objs[0] != null && objs[1] != null){ + pda = (PrefixedDataAvailable)objs[0] ; + //澶勭悊瀹屾暣鎬ф鏌� + pds = pda.forOnLine(ioSession, in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).onLineDataMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ; + //鐢ㄥ畬瀵硅薄鍚庯紝鏀惧洖姹犱腑 + PrefixedDataAvailablePool.freeInstance(objs[0].getClass(), pda); + } + if(pds == null){ + //濡傛灉浼氳瘽涓紦瀛樼殑鍗忚鏈兘姝g‘澶勭悊 + HashMap<String, AnnotationPrefixedDataAvailableVo> prefixedDataAvailableMap = ProtocolCache.getPrefixedDataAvailableMap() ; + Collection<AnnotationPrefixedDataAvailableVo> set = prefixedDataAvailableMap.values() ; + if(set.size() == 0){ + throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屽緱鍒扮殑鍗忚瀹屾暣鎬ф鏌ョ被闆嗗悎涓虹┖銆�") ; } - } + int priority = ProtocolConstant.firstPriority ; + + while(true){ + objs = this.getClassObjAndAnnotationVo(priority, set) ; + pda = (PrefixedDataAvailable)objs[0] ; + if(pda == null && priority == ProtocolConstant.firstPriority){ + throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屾湭寰楀埌浼樺厛绾т负" + priority + "涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被锛�") ; + }else if(pda == null){ + //璇存槑涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被闆嗗悎宸茬粡閬嶅巻瀹屼簡銆� + break ; + } + //澶勭悊瀹屾暣鎬ф鏌� + pds = pda.forOnLine(ioSession, in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).onLineDataMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ; + //鐢ㄥ畬瀵硅薄鍚庯紝鏀惧洖姹犱腑 + //2024-11-25涓嬮潰涓�琛岃繘琛屼簡淇敼 + //PrefixedDataAvailablePool.freeInstance(((AnnotationPrefixedDataAvailableVo)objs[1]).clazz, pda); + PrefixedDataAvailablePool.freeInstance(objs[0].getClass(), pda); + if(pds == null){ + //璇存槑涓嶆槸瀵瑰簲鐨勫崗璁暟鎹紝闇�瑕佸彟澶栫殑鍗忚鏉ュ鐞嗕笂绾� + //寰幆缁х画 + priority++ ; + }else{ + //鍋滄寰幆锛岃繑鍥炵粨鏋� + break ; + } + } + } if(pds == null){ //璇存槑鏁版嵁涓嶅睘浜庝换浣曞崗璁紝涓�鑸负Rtu鏁版嵁鍑洪敊锛屾垨缃戠粶鏀诲嚮鏁版嵁 byte[] preByte = new byte[remain]; @@ -82,43 +104,59 @@ * @param in IoBuffer * @return PrefixedDataAvailableStatus */ - public PrefixedDataAvailableStatus forUpData(IoBuffer in){ + public PrefixedDataAvailableStatus forUpData(IoSession ioSession, IoBuffer in){ int remain = in.remaining() ; if(remain == 0){ return new PrefixedDataAvailableStatus().breaked() ; }else{ try{ - PrefixedDataAvailableStatus pds = null ; - - HashMap<String, AnnotationPrefixedDataAvailableVo> prefixedDataAvailableMap = ProtocolCach.getPrefixedDataAvailableMap() ; - Collection<AnnotationPrefixedDataAvailableVo> set = prefixedDataAvailableMap.values() ; - if(set.size() == 0){ - throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屽緱鍒扮殑鍗忚闆嗗悎涓虹┖銆�") ; - } - int prority = ProtocolConstant.firstPriority ; + PrefixedDataAvailableStatus pds = null ; + PrefixedDataAvailable pda = null ; - while(true){ - Object[] objs = this.getClassObjAndAnnotationVo(prority, set) ; - PrefixedDataAvailable pda = (PrefixedDataAvailable)objs[0] ; - if(pda == null && prority == ProtocolConstant.firstPriority){ - throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屾湭寰楀埌浼樺厛绾т负" + prority + "涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被锛�") ; - }else if(pda == null){ - //璇存槑涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被闆嗗悎宸茬粡閬嶅巻瀹屼簡銆� - break ; - } - //澶勭悊瀹屾暣鎬ф鏌� - pds = pda.forUpData(in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).headMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ; - //鐢ㄥ畬瀵硅薄鍚庯紝鏀惧洖姹犱腑 - PrefixedDataAvailablePool.freeInstance(objs[0].getClass(), pda); - if(pds == null){ - //璇存槑涓嶆槸瀵瑰簲鐨勫崗璁暟鎹紝闇�瑕佸彟澶栫殑鍗忚鏉ュ鐞嗕笂绾� - //寰幆缁х画 - prority++ ; - }else{ - //鍋滄寰幆锛岃繑鍥炵粨鏋� - break ; - } - } + //浠庝細璇濈紦瀛樺緱鍒颁笂娆′笂琛屾暟鎹‘瀹氱殑鍗忚 + String protocolName = (String) ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName) ; + Short protocolVersion = (Short) ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion) ; + //鐢变細璇濅腑鐨勫崗璁緱鍒板鐞嗙被 + Object[] objs = this.getClassObjAndAnnotationVo(protocolName, protocolVersion) ; + if(objs != null && objs[0] != null && objs[1] != null){ + pda = (PrefixedDataAvailable)objs[0] ; + //澶勭悊瀹屾暣鎬ф鏌� + pds = pda.forOnLine(ioSession, in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).onLineDataMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ; + //鐢ㄥ畬瀵硅薄鍚庯紝鏀惧洖姹犱腑 + PrefixedDataAvailablePool.freeInstance(objs[0].getClass(), pda); + } + if(pds == null) { + //濡傛灉浼氳瘽涓紦瀛樼殑鍗忚鏈兘姝g‘澶勭悊 + HashMap<String, AnnotationPrefixedDataAvailableVo> prefixedDataAvailableMap = ProtocolCache.getPrefixedDataAvailableMap() ; + Collection<AnnotationPrefixedDataAvailableVo> set = prefixedDataAvailableMap.values() ; + if(set.size() == 0){ + throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屽緱鍒扮殑鍗忚闆嗗悎涓虹┖銆�") ; + } + int priority = ProtocolConstant.firstPriority ; + + while(true){ + objs = this.getClassObjAndAnnotationVo(priority, set) ; + pda = (PrefixedDataAvailable)objs[0] ; + if(pda == null && priority == ProtocolConstant.firstPriority){ + throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屾湭寰楀埌浼樺厛绾т负" + priority + "涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被锛�") ; + }else if(pda == null){ + //璇存槑涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被闆嗗悎宸茬粡閬嶅巻瀹屼簡銆� + break ; + } + //澶勭悊瀹屾暣鎬ф鏌� + pds = pda.forUpData(ioSession, in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).headMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ; + //鐢ㄥ畬瀵硅薄鍚庯紝鏀惧洖姹犱腑 + PrefixedDataAvailablePool.freeInstance(objs[0].getClass(), pda); + if(pds == null){ + //璇存槑涓嶆槸瀵瑰簲鐨勫崗璁暟鎹紝闇�瑕佸彟澶栫殑鍗忚鏉ュ鐞嗕笂绾� + //寰幆缁х画 + priority++ ; + }else{ + //鍋滄寰幆锛岃繑鍥炵粨鏋� + break ; + } + } + } if(pds == null){ //璇存槑鏁版嵁涓嶅睘浜庝换浣曞崗璁紝涓�鑸负Rtu鏁版嵁鍑洪敊锛屾垨缃戠粶鏀诲嚮鏁版嵁 byte[] preByte = new byte[remain]; @@ -157,5 +195,21 @@ } return new Object[]{obj, rVo} ; } + /** + * 寰楀埌澶勭悊绫诲璞� + * @param protocolName 閫氫俊鍗忚鍚嶇О + * @param protocolVersion 閫氫俊鍗忚鐗堟湰鍙� + * @return Object[] + * @throws Exception 寮傚父 + */ + private Object[] getClassObjAndAnnotationVo(String protocolName, Short protocolVersion) throws Exception{ + PrefixedDataAvailable obj = null ; + AnnotationPrefixedDataAvailableVo rVo = null ; + if(protocolName != null && !protocolName.trim().equals("") && protocolVersion != null){ + rVo = ProtocolCache.getAnnotationPrefixedDataAvailable(protocolName, protocolVersion) ; + obj = PrefixedDataAvailablePool.getInstance(rVo.clazz) ; + } + return new Object[]{obj, rVo} ; + } } -- Gitblit v1.8.0