From 1135c89deb50a080152f9086fc7b741c415ecd54 Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期三, 12 二月 2025 17:00:14 +0800
Subject: [PATCH] 通信中间件增加功能: 1、实现消息心中; 2、开阀报、关阀报、报警数据都会在消息中间件存入消息; 3、在消息中心注册消息接收者,消息中心周期性向消息接收者推送消息。

---
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/PrefixedDataAvailableHandleImp.java |  176 ++++++++++++++++++++++++++++++++++++++--------------------
 1 files changed, 115 insertions(+), 61 deletions(-)

diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/PrefixedDataAvailableHandleImp.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/PrefixedDataAvailableHandleImp.java
index 56d32fc..3b99830 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/PrefixedDataAvailableHandleImp.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/PrefixedDataAvailableHandleImp.java
@@ -3,6 +3,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 
+import com.dy.common.mw.channel.tcp.TcpIoSessionAttrIdIsRtuAddr;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.mina.core.buffer.IoBuffer;
@@ -10,6 +11,7 @@
 import com.dy.common.util.ByteUtil;
 import com.dy.common.mw.channel.tcp.PrefixedDataAvailableHandle;
 import com.dy.common.mw.channel.tcp.PrefixedDataAvailableStatus;
+import org.apache.mina.core.session.IoSession;
 
 public class PrefixedDataAvailableHandleImp implements PrefixedDataAvailableHandle {
 	
@@ -22,43 +24,63 @@
 	 * @param in IoBuffer
 	 * @return PrefixedDataAvailableStatus
 	 */
-	public PrefixedDataAvailableStatus forOnLine(IoBuffer in){
+	public PrefixedDataAvailableStatus forOnLine(IoSession ioSession, IoBuffer in){
     	int remain = in.remaining() ;
     	if(remain == 0){
     		return new PrefixedDataAvailableStatus().breaked() ;
     	}else{
     		try{
-    			PrefixedDataAvailableStatus pds = null ;
-    			
-    			HashMap<String, AnnotationPrefixedDataAvailableVo> prefixedDataAvailableMap = ProtocolCach.getPrefixedDataAvailableMap() ;
-    			Collection<AnnotationPrefixedDataAvailableVo> set = prefixedDataAvailableMap.values() ;
-    			if(set.size() == 0){
-    				throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屽緱鍒扮殑鍗忚瀹屾暣鎬ф鏌ョ被闆嗗悎涓虹┖銆�") ;
-    			}
-    			int prority = ProtocolConstant.firstPriority ;
+				PrefixedDataAvailableStatus pds = null ;
+				PrefixedDataAvailable pda = null ;
 
-	    		while(true){
-	    			Object[] objs = this.getClassObjAndAnnotationVo(prority, set) ;
-	    			PrefixedDataAvailable pda = (PrefixedDataAvailable)objs[0] ;
-	    			if(pda == null && prority == ProtocolConstant.firstPriority){
-	    				throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屾湭寰楀埌浼樺厛绾т负" + prority + "涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被锛�") ;
-	    			}else if(pda == null){
-	    				//璇存槑涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被闆嗗悎宸茬粡閬嶅巻瀹屼簡銆�
-	    				break ;
-	    			}
-	    			//澶勭悊瀹屾暣鎬ф鏌�
-	    			pds = pda.forOnLine(in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).onLineDataMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ;
-	    			//鐢ㄥ畬瀵硅薄鍚庯紝鏀惧洖姹犱腑
-	    			PrefixedDataAvailablePool.freeInstance(((AnnotationPrefixedDataAvailableVo)objs[1]).clazz, pda);
-	    			if(pds == null){
-						//璇存槑涓嶆槸瀵瑰簲鐨勫崗璁暟鎹紝闇�瑕佸彟澶栫殑鍗忚鏉ュ鐞嗕笂绾�
-						//寰幆缁х画
-						prority++ ;
-					}else{
-						//鍋滄寰幆锛岃繑鍥炵粨鏋�
-						break ;
+				//浠庝細璇濈紦瀛樺緱鍒颁笂娆′笂琛屾暟鎹‘瀹氱殑鍗忚
+				String protocolName = (String) ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName) ;
+    			Short protocolVersion = (Short) ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion) ;
+    			//鐢变細璇濅腑鐨勫崗璁緱鍒板鐞嗙被
+				//鍒╃敤缂撳瓨鐨勫崗璁紝杩涜�屽涓嶇鍚堝崗璁殑涓婅鏁版嵁涔熻兘鐢ㄨ鍗忚澶勭悊浜嗭紝渚嬪RTU閲囩敤涓嶅悓鏁版嵁鍗忚鐨勫祵鍏ュ紡杞欢鍗囩骇鍗忚
+				//鍗砇TU绗竴鍖呮暟鎹‘瀹氫笅鏉ュ崗璁紝鍚庨潰鏁版嵁閮界敤杩欎釜鍗忚澶勭悊锛岄偅涔堟暟鎹彲浠ュ彉鍖栦簡锛屽彲浠ヤ笉绗﹀悎绗竴鍖呮暟鎹伒瀹堢殑鍗忚浜�
+				Object[] objs =  this.getClassObjAndAnnotationVo(protocolName, protocolVersion) ;
+				if(objs != null && objs[0] != null && objs[1] != null){
+					pda = (PrefixedDataAvailable)objs[0] ;
+					//澶勭悊瀹屾暣鎬ф鏌�
+					pds = pda.forOnLine(ioSession, in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).onLineDataMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ;
+					//鐢ㄥ畬瀵硅薄鍚庯紝鏀惧洖姹犱腑
+					PrefixedDataAvailablePool.freeInstance(objs[0].getClass(), pda);
+				}
+				if(pds == null){
+					//濡傛灉浼氳瘽涓紦瀛樼殑鍗忚鏈兘姝g‘澶勭悊
+					HashMap<String, AnnotationPrefixedDataAvailableVo> prefixedDataAvailableMap = ProtocolCache.getPrefixedDataAvailableMap() ;
+					Collection<AnnotationPrefixedDataAvailableVo> set = prefixedDataAvailableMap.values() ;
+					if(set.size() == 0){
+						throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屽緱鍒扮殑鍗忚瀹屾暣鎬ф鏌ョ被闆嗗悎涓虹┖銆�") ;
 					}
-	    		}
+					int priority = ProtocolConstant.firstPriority ;
+
+					while(true){
+						objs = this.getClassObjAndAnnotationVo(priority, set) ;
+						pda = (PrefixedDataAvailable)objs[0] ;
+						if(pda == null && priority == ProtocolConstant.firstPriority){
+							throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屾湭寰楀埌浼樺厛绾т负" + priority + "涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被锛�") ;
+						}else if(pda == null){
+							//璇存槑涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被闆嗗悎宸茬粡閬嶅巻瀹屼簡銆�
+							break ;
+						}
+						//澶勭悊瀹屾暣鎬ф鏌�
+						pds = pda.forOnLine(ioSession, in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).onLineDataMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ;
+						//鐢ㄥ畬瀵硅薄鍚庯紝鏀惧洖姹犱腑
+						//2024-11-25涓嬮潰涓�琛岃繘琛屼簡淇敼
+						//PrefixedDataAvailablePool.freeInstance(((AnnotationPrefixedDataAvailableVo)objs[1]).clazz, pda);
+						PrefixedDataAvailablePool.freeInstance(objs[0].getClass(), pda);
+						if(pds == null){
+							//璇存槑涓嶆槸瀵瑰簲鐨勫崗璁暟鎹紝闇�瑕佸彟澶栫殑鍗忚鏉ュ鐞嗕笂绾�
+							//寰幆缁х画
+							priority++ ;
+						}else{
+							//鍋滄寰幆锛岃繑鍥炵粨鏋�
+							break ;
+						}
+					}
+				}
 	    		if(pds == null){
 	    			//璇存槑鏁版嵁涓嶅睘浜庝换浣曞崗璁紝涓�鑸负Rtu鏁版嵁鍑洪敊锛屾垨缃戠粶鏀诲嚮鏁版嵁
 	    			byte[] preByte = new byte[remain];
@@ -82,43 +104,59 @@
 	 * @param in IoBuffer
 	 * @return PrefixedDataAvailableStatus
 	 */
-	public PrefixedDataAvailableStatus forUpData(IoBuffer in){
+	public PrefixedDataAvailableStatus forUpData(IoSession ioSession, IoBuffer in){
 		int remain = in.remaining() ;
     	if(remain == 0){
     		return new PrefixedDataAvailableStatus().breaked() ;
     	}else{
     		try{
-   			PrefixedDataAvailableStatus pds = null ;
-    			
-    			HashMap<String, AnnotationPrefixedDataAvailableVo> prefixedDataAvailableMap = ProtocolCach.getPrefixedDataAvailableMap() ;
-    			Collection<AnnotationPrefixedDataAvailableVo> set = prefixedDataAvailableMap.values() ;
-    			if(set.size() == 0){
-    				throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屽緱鍒扮殑鍗忚闆嗗悎涓虹┖銆�") ;
-    			}
-    			int prority = ProtocolConstant.firstPriority ;
+				PrefixedDataAvailableStatus pds = null ;
+				PrefixedDataAvailable pda = null ;
 
-	    		while(true){
-	    			Object[] objs = this.getClassObjAndAnnotationVo(prority, set) ;
-	    			PrefixedDataAvailable pda = (PrefixedDataAvailable)objs[0] ;
-	    			if(pda == null && prority == ProtocolConstant.firstPriority){
-	    				throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屾湭寰楀埌浼樺厛绾т负" + prority + "涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被锛�") ;
-	    			}else if(pda == null){
-	    				//璇存槑涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被闆嗗悎宸茬粡閬嶅巻瀹屼簡銆�
-	    				break ;
-	    			}
-	    			//澶勭悊瀹屾暣鎬ф鏌�
-        			pds = pda.forUpData(in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).headMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ;
-        			//鐢ㄥ畬瀵硅薄鍚庯紝鏀惧洖姹犱腑
-        			PrefixedDataAvailablePool.freeInstance(objs[0].getClass(), pda);
-        			if(pds == null){
-    					//璇存槑涓嶆槸瀵瑰簲鐨勫崗璁暟鎹紝闇�瑕佸彟澶栫殑鍗忚鏉ュ鐞嗕笂绾�
-    					//寰幆缁х画
-    					prority++ ;
-    				}else{
-    					//鍋滄寰幆锛岃繑鍥炵粨鏋�
-    					break ;
-    				}
-        		}
+				//浠庝細璇濈紦瀛樺緱鍒颁笂娆′笂琛屾暟鎹‘瀹氱殑鍗忚
+				String protocolName = (String) ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolName) ;
+				Short protocolVersion = (Short) ioSession.getAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrProtocolVersion) ;
+				//鐢变細璇濅腑鐨勫崗璁緱鍒板鐞嗙被
+				Object[] objs =  this.getClassObjAndAnnotationVo(protocolName, protocolVersion) ;
+				if(objs != null && objs[0] != null && objs[1] != null){
+					pda = (PrefixedDataAvailable)objs[0] ;
+					//澶勭悊瀹屾暣鎬ф鏌�
+					pds = pda.forOnLine(ioSession, in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).onLineDataMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ;
+					//鐢ㄥ畬瀵硅薄鍚庯紝鏀惧洖姹犱腑
+					PrefixedDataAvailablePool.freeInstance(objs[0].getClass(), pda);
+				}
+				if(pds == null) {
+					//濡傛灉浼氳瘽涓紦瀛樼殑鍗忚鏈兘姝g‘澶勭悊
+					HashMap<String, AnnotationPrefixedDataAvailableVo> prefixedDataAvailableMap = ProtocolCache.getPrefixedDataAvailableMap() ;
+					Collection<AnnotationPrefixedDataAvailableVo> set = prefixedDataAvailableMap.values() ;
+					if(set.size() == 0){
+						throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屽緱鍒扮殑鍗忚闆嗗悎涓虹┖銆�") ;
+					}
+					int priority = ProtocolConstant.firstPriority ;
+
+					while(true){
+						objs = this.getClassObjAndAnnotationVo(priority, set) ;
+						pda = (PrefixedDataAvailable)objs[0] ;
+						if(pda == null && priority == ProtocolConstant.firstPriority){
+							throw new Exception("涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ユ椂锛屾湭寰楀埌浼樺厛绾т负" + priority + "涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被锛�") ;
+						}else if(pda == null){
+							//璇存槑涓婄嚎鏁版嵁瀹屾暣鎬ф鏌ョ被闆嗗悎宸茬粡閬嶅巻瀹屼簡銆�
+							break ;
+						}
+						//澶勭悊瀹屾暣鎬ф鏌�
+						pds = pda.forUpData(ioSession, in, remain, ((AnnotationPrefixedDataAvailableVo)objs[1]).headMinLength, ((AnnotationPrefixedDataAvailableVo)objs[1]).errorMaxLength) ;
+						//鐢ㄥ畬瀵硅薄鍚庯紝鏀惧洖姹犱腑
+						PrefixedDataAvailablePool.freeInstance(objs[0].getClass(), pda);
+						if(pds == null){
+							//璇存槑涓嶆槸瀵瑰簲鐨勫崗璁暟鎹紝闇�瑕佸彟澶栫殑鍗忚鏉ュ鐞嗕笂绾�
+							//寰幆缁х画
+							priority++ ;
+						}else{
+							//鍋滄寰幆锛岃繑鍥炵粨鏋�
+							break ;
+						}
+					}
+				}
         		if(pds == null){
         			//璇存槑鏁版嵁涓嶅睘浜庝换浣曞崗璁紝涓�鑸负Rtu鏁版嵁鍑洪敊锛屾垨缃戠粶鏀诲嚮鏁版嵁
         			byte[] preByte = new byte[remain];
@@ -157,5 +195,21 @@
 		}
 		return new Object[]{obj, rVo} ;
 	}
+	/**
+	 * 寰楀埌澶勭悊绫诲璞�
+	 * @param protocolName 閫氫俊鍗忚鍚嶇О
+	 * @param protocolVersion 閫氫俊鍗忚鐗堟湰鍙�
+	 * @return Object[]
+	 * @throws Exception 寮傚父
+	 */
+	private Object[] getClassObjAndAnnotationVo(String protocolName, Short protocolVersion) throws Exception{
+		PrefixedDataAvailable obj = null ;
+		AnnotationPrefixedDataAvailableVo rVo = null ;
+		if(protocolName != null && !protocolName.trim().equals("") && protocolVersion != null){
+			rVo =  ProtocolCache.getAnnotationPrefixedDataAvailable(protocolName, protocolVersion) ;
+			obj = PrefixedDataAvailablePool.getInstance(rVo.clazz) ;
+		}
+		return new Object[]{obj, rVo} ;
+	}
 
 }

--
Gitblit v1.8.0