From f8b2e59a82702a790c383a8ecd90c708c76e2488 Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期四, 05 六月 2025 17:51:59 +0800
Subject: [PATCH] 增量开发MQTT协议、功能模块,上下行命令(消息)等

---
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java |  158 +++++++++++++++++++++++++++++++++++++++++++---------
 1 files changed, 129 insertions(+), 29 deletions(-)

diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
index 72902e7..b7917f5 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
@@ -3,11 +3,16 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import com.dy.common.util.ConfigProperties;
+import com.dy.common.util.IPUtils;
 import com.dy.rtuMw.server.*;
+import com.dy.rtuMw.server.mqtt.MqttUnit;
+import com.dy.rtuMw.server.mqtt.MqttUnitConfigVo;
+import com.dy.rtuMw.server.msCenter.MsCenterConfigVo;
+import com.dy.rtuMw.server.msCenter.MsCenterUnit;
 import com.dy.rtuMw.server.rtuData.RtuDataUnit;
 import com.dy.rtuMw.server.rtuData.RtuDataUnitConfigVo;
-import com.dy.rtuMw.server.tasks.FromRtuComResultConstantTask;
-import com.dy.rtuMw.server.tasks.FromRtuDataConstantTask;
+import com.dy.rtuMw.server.tasks.*;
 import com.dy.common.mw.UnitInterface;
 import com.dy.common.mw.channel.tcp.TcpConfigVo;
 import com.dy.common.mw.channel.tcp.TcpUnit;
@@ -17,8 +22,6 @@
 import com.dy.common.mw.protocol.ProtocolUnit;
 import com.dy.common.mw.support.SupportUnit;
 import com.dy.common.mw.support.SupportUnitConfigVo;
-import com.dy.rtuMw.server.tasks.SendMsConstantTask;
-import com.dy.rtuMw.server.tasks.RtuDownConstantTask;
 import com.dy.rtuMw.resource.ResourceUnit;
 import com.dy.rtuMw.resource.ResourceUnitConfigVo;
 import com.dy.common.springUtil.SpringContextUtil;
@@ -40,6 +43,7 @@
 	private Document doc = null ;
 	private boolean showStartInfo = false ;
 	
+	private String orgTag ;
 	private String RmiSvUrl ;
 	private String TcpSvUrl ;
 	@Value("${server.port}")
@@ -83,7 +87,7 @@
 		boolean running = false ;
 		long start = System.currentTimeMillis() ;
 		try {
-			//ConfigProperties.init(this.getClass().getResourceAsStream("/config/config.properties"), false);
+			ConfigProperties.init(this.getClass().getResourceAsStream("/config.properties"), false);
 			
 			this.conf = new ConfigXml4Springboot() ;
 			this.doc = this.conf.createDom(this.resourceLoader, "config.xml") ;
@@ -113,7 +117,7 @@
             System.out.println("@@@@@@@@@@@@@@@@@@@@@#   #@@@@@@@@@@@@@@@@O") ;    
             System.out.println("@@@@@@@@@@@@@@@@@@@@@@@   &@@@@@@@@@@@@@@") ;           
             System.out.println("@@@@@@$      $@@@@@@@@@&   O@@@@@@@@@@@#") ;        
-            System.out.println("@@@@@@$        @@@@@@@@@     @@@@@@@@@&      " + svName + "RtuMw 1.0.00" ) ;             
+            System.out.println("@@@@@@$        @@@@@@@@@     @@@@@@@@@&      " + this.orgTag + svName + "RtuMw 1.0.00" ) ;
 			if(this.HttpSvPath != null && this.HttpSvPort != null){
 				System.out.println("@@@@@@$       O@@@@@@@@@     &@@@@@@@@       HttpSv [ip]:" + this.HttpSvPort + this.HttpSvPath) ;
 			}else{
@@ -130,7 +134,7 @@
             }else{
                 System.out.println("@@@@@@$      #@@@@@@@@@$     &@@@@@@@@" ) ;        
             }
-            System.out.println("@@@@@@@@@@@@@@@@@@@@@@#      &@@@@@@@@       Runing in standalone mode" ) ;    
+            System.out.println("@@@@@@@@@@@@@@@@@@@@@@#      &@@@@@@@@       Running in standalone mode" ) ;
             System.out.println("@@@@@@@@@@@@@@@@@@@@@&       &@@@@@@@@       Startup in " + (System.currentTimeMillis() - start) + " MS" ) ;              
             System.out.println("@@@@@@@@@@@@@@@@@@@#         &@@@@@@@@       " + company) ;            
             System.out.println("@@@@@@@@@@@@@@@@#O           &@@@@@@@@") ;
@@ -147,9 +151,10 @@
 			///////////////
 			//鍩烘湰閰嶇疆
 			ServerProperties.orgTag = this.conf.getSetAttrTxt(this.doc, "config.base", "orgTag", null, false, null) ;
-			if(ServerProperties.orgTag==null || ServerProperties.orgTag.trim().equals("")){
+			if(ServerProperties.orgTag == null || ServerProperties.orgTag.trim().equals("")){
 				throw new Exception("鏈烘瀯tag涓嶈兘涓虹┖") ;
 			}
+			this.orgTag = ServerProperties.orgTag ;
 			ServerProperties.isLowPower = conf.getSetAttrBoolean(doc, "config.base", "isLowPower", null, null) ; 
 			if(ServerProperties.isLowPower == null){
 				ServerProperties.isLowPower = false ;
@@ -181,10 +186,12 @@
 			ServerProperties.downComandMaxResendTimes = conf.getSetAttrPlusInt(doc, "config.base", "downComandMaxResendTimes", null, 1, 5, null).byteValue() ;
 			//閽堝涓�涓猂TU锛屼笅鍙戝懡浠ょ殑鏃堕棿闂撮殧
 			ServerProperties.commandSendInterval = conf.getSetAttrPlusInt(doc, "config.base", "commandSendInterval", null, 1, 40, null) * 1000L ;
+			//閽堝涓�涓猂TU锛屼笅鍙戝揩閫熷懡浠ょ殑鏃堕棿闂撮殧
+			ServerProperties.fastCommandSendInterval = conf.getSetAttrPlusInt(doc, "config.base", "fastCommandSendInterval", null, 1, 40000, null) * 1L ;
 			//鍛戒护宸茬粡鍙戦�佽揪鏈�澶ф鏁帮紝浠嶆湭鏀跺埌鍛戒护缁撴灉锛岄渶瑕佸湪缂撳瓨缁х画绛夊緟锛屽叾绛夊緟鏈�澶ф椂闀�
-			ServerProperties.cachWaitResultTimeout = conf.getSetAttrPlusInt(doc, "config.base", "cachWaitResultTimeout", null, 10, 360, null) * 1000L ;
+			ServerProperties.cacheWaitResultTimeout = conf.getSetAttrPlusInt(doc, "config.base", "cacheWaitResultTimeout", null, 10, 360, null) * 1000L ;
 			//涓嶅湪绾跨紦瀛樼殑鍛戒护鏈�澶х紦瀛樻椂闀�
-			ServerProperties.offLineCachTimeout = conf.getSetAttrPlusInt(doc, "config.base", "offLineCachTimeout", null, 15, 172800, null) * 1000L ;
+			ServerProperties.offLineCacheTimeout = conf.getSetAttrPlusInt(doc, "config.base", "offLineCacheTimeout", null, 15, 172800, null) * 1000L ;
 			//TCP涓婅鏁版嵁鏃跺埢缂撳瓨鏃堕暱锛屽綋杈惧埌鏃堕暱鏃讹紝TCP涓婅鏁版嵁鏃跺埢琚竻绌猴紝閲囩敤TCP涓婅鏁版嵁鏃跺埢鐩殑鏄紝闃绘涓婃暟鎹悓鏃朵笅鍙戞暟鎹紝鍥犱负RTU澶勭悊涓嶈繃鏉�
 			ServerProperties.lastUpDataTimeLive = conf.getSetAttrPlusInt(doc, "config.base", "lastUpDataTimeLive", null, 0, 5000, null) * 1L ;
 			//鏁版嵁搴撴暟鎹甶d鐢熸垚鍣ㄧ殑id鍚庣紑锛�0鏄粯璁ょ殑鍚庣紑锛屼竴鑸瑆eb绯荤粺搴旂敤锛屾暟鎹腑闂翠欢id鍚庣紑澶т簬绛変簬1
@@ -332,26 +339,51 @@
 			}
 			*/
 
+
+			/////////////////
+			//娑堟伅涓績妯″潡
+			MsCenterConfigVo mscVo = new MsCenterConfigVo();
+			mscVo.enable = conf.getSetAttrBoolean(doc, "config.msCenter", "enable", null, null) ;
+			mscVo.notifyMsInterval = conf.getSetAttrPlusInt(doc, "config.msCenter", "notifyInterval", null, 1, 600, null) * 1000L ;
+			mscVo.showStartInfo = showStartInfo ;
+			AdapterImp_MsCenterUnit mscAdapt = new AdapterImp_MsCenterUnit();
+			mscAdapt.setConfig(mscVo);
+			MsCenterUnit mscUnit = MsCenterUnit.getInstance();
+			mscUnit.setAdapter(mscAdapt);
+			mscUnit.start(obj -> {
+			});
+			units.add(mscUnit) ;
+
+
 			/////////////////
 			//RTU杩滅▼鍗囩骇妯″潡
 			UpgradeUnitConfigVo ugVo = new UpgradeUnitConfigVo();
 			ugVo.enable = conf.getSetAttrBoolean(doc, "config.upgrade", "enable", null, null) ;
-			ugVo.noOneRtuUpgradeMaxDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "noOneRtuUpgradeMaxDuration", null, 5, 3600, null);
-			ugVo.noOneRtuUpgradeMaxDuration = ugVo.noOneRtuUpgradeMaxDuration * 1000 ;//鍙樻垚姣
-			ugVo.failTryTimes = conf.getSetAttrPlusInt(doc, "config.upgrade", "failTryTimes", null, 0, 100, null);
-			ugVo.ugMaxRtuAtOnce = conf.getSetAttrPlusInt(doc, "config.upgrade", "ugMaxRtuAtOnce", null, 0, 1000000, null);
-			ugVo.rtuOffLineWaitDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "rtuOffLineWaitDuration", null, 1, 36000, null);
-			ugVo.rtuOffLineWaitDuration = ugVo.rtuOffLineWaitDuration * 1000;//鍙樻垚姣
-			ugVo.notifyStateInterval = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyStateInterval", null, 1, 300, null);
-			ugVo.notifyStateInterval = ugVo.notifyStateInterval * 1000;//鍙樻垚姣
-			ugVo.showStartInfo = showStartInfo ;
-			AdapterImp_UpgradeUnit ugAdap = new AdapterImp_UpgradeUnit();
-			ugAdap.setConfig(ugVo);
-			UpgradeUnit ugUnit = UpgradeUnit.getInstance();
-			ugUnit.setAdapter(ugAdap);
-			ugUnit.start(obj -> {
-			});
-			units.add(ugUnit) ;
+			if(ugVo.enable){
+				ugVo.openNoUpgrade = conf.getSetAttrBoolean(doc, "config.upgrade", "openNoUpgrade", null, null) ;
+				ugVo.lastOpenMaxGoOn = conf.getSetAttrPlusInt(doc, "config.upgrade", "lastOpenMaxGoOn", null, 5, 360000, null);
+				ugVo.lastOpenMaxGoOn = ugVo.lastOpenMaxGoOn * 1000 ;//鍙樻垚姣
+				ugVo.noOneRtuUpgradeMaxDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "noOneRtuUpgradeMaxDuration", null, 5, 360000, null);
+				ugVo.noOneRtuUpgradeMaxDuration = ugVo.noOneRtuUpgradeMaxDuration * 1000 ;//鍙樻垚姣
+				ugVo.runningAndIdleDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "runningAndIdleDuration", null, 5, 360000, null);
+				ugVo.runningAndIdleDuration = ugVo.runningAndIdleDuration * 1000 ;//鍙樻垚姣
+				ugVo.failTryTimes = conf.getSetAttrPlusInt(doc, "config.upgrade", "failTryTimes", null, 0, 100, null);
+				ugVo.ugMaxRtuAtOnce = conf.getSetAttrPlusInt(doc, "config.upgrade", "ugMaxRtuAtOnce", null, 0, 1000000, null);
+				ugVo.rtuOffLineWaitDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "rtuOffLineWaitDuration", null, 1, 3600000, null);
+				ugVo.rtuOffLineWaitDuration = ugVo.rtuOffLineWaitDuration * 1000;//鍙樻垚姣
+				ugVo.notifyStateInterval = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyStateInterval", null, 1, 300, null);
+				ugVo.notifyStateInterval = ugVo.notifyStateInterval * 1000;//鍙樻垚姣
+				ugVo.notifyTimesAfterOver = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyTimesAfterOver", null, 0, null, null);
+				ugVo.showStartInfo = showStartInfo ;
+				AdapterImp_UpgradeUnit ugAdap = new AdapterImp_UpgradeUnit();
+				ugAdap.setConfig(ugVo);
+				UpgradeUnit ugUnit = UpgradeUnit.getInstance();
+				ugUnit.setAdapter(ugAdap);
+				ugUnit.start(obj -> {
+				});
+				units.add(ugUnit) ;
+			}
+
 
 			/////////////////
 			//RTU涓婅鏁版嵁澶勭悊妯″潡锛堜换鍔℃爲锛�
@@ -370,8 +402,7 @@
 			// ///////////////
 			// 鏍稿績
 			CoreUnitConfigVo coreConfVo = new CoreUnitConfigVo();
-			coreConfVo.sleepBigBusy = conf.getSetAttrPlusInt(doc, "config.core", "sleepBigBusy", null, 1, 200, null).longValue() ;
-			coreConfVo.sleepSmallBusy = conf.getSetAttrPlusInt(doc, "config.core", "sleepSmallBusy", null, 2, 1000, null).longValue();
+			coreConfVo.coreInterval = conf.getSetAttrPlusInt(doc, "config.core", "coreInterval", null, 1, 200, null).longValue() ;
 			coreConfVo.queueWarnSize = ServerProperties.cacheUpDownDataWarnCount ;
 			coreConfVo.queueMaxSize = ServerProperties.cacheUpDownDataMaxCount ;
 			coreConfVo.showStartInfo = showStartInfo ;
@@ -382,6 +413,12 @@
 			CoreUnit.addConstantTask(new RtuDownConstantTask());
 			CoreUnit.addConstantTask(new FromRtuDataConstantTask());
 			CoreUnit.addConstantTask(new FromRtuComResultConstantTask());
+			Boolean enableMq = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ;
+			if(enableMq != null && enableMq.booleanValue()){
+				CoreUnit.addConstantTask(new MqttSubMessageConstantTask());
+				CoreUnit.addConstantTask(new MqttPubMessageConstantTask());
+				CoreUnit.addConstantTask(new MqttComResultConstantTask());
+			}
 			CoreUnit.addConstantTask(new SendMsConstantTask());
 			coreUnit.start(obj -> {
 			});
@@ -405,7 +442,70 @@
 				});
 				TcpSvUrl = "[ip]:" + tcpVo.port ;
 				units.add(tcpUnit) ;
-			}	
+			}
+
+			/////////////////
+			//MQTT妯″潡
+			MqttUnitConfigVo mqVo = new MqttUnitConfigVo();
+			mqVo.enable = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ;
+			ServerProperties.mqttUnitEnable = mqVo.enable ;
+			if(mqVo.enable){
+				mqVo.svIp = conf.getSetAttrTxt(doc, "config.mqtt", "svIp", null, true, null) ;
+				if(!IPUtils.ipValid(mqVo.svIp)){
+					throw new Exception("config.mqtt.svIp閰嶇疆鐨処P涓嶅悎娉�") ;
+				}
+				mqVo.svPort = conf.getSetAttrPlusInt(doc, "config.mqtt", "svPort", null, 5, 360000, null);
+				if(mqVo.svPort < 0 || mqVo.svPort > 65535){
+					throw new Exception("config.mqtt.svPort閰嶇疆鐨勭鍙d笉鍚堟硶") ;
+				}
+				mqVo.svUserName = conf.getSetAttrTxt(doc, "config.mqtt", "svUserName", null, true, null) ;
+				if(mqVo.svUserName == null || mqVo.svUserName.trim().equals("")){
+					throw new Exception("config.mqtt.svUserName閰嶇疆鐨勭敤鎴峰悕涓嶅悎娉�") ;
+				}else{
+					mqVo.svUserName = mqVo.svUserName.trim() ;
+				}
+				mqVo.svUserPassword = conf.getSetAttrTxt(doc, "config.mqtt", "svUserPassword", null, true, null) ;
+				if(mqVo.svUserPassword == null || mqVo.svUserPassword.trim().equals("")){
+					throw new Exception("config.mqtt.svUserName閰嶇疆鐨勭敤鎴峰瘑鐮佷笉鍚堟硶") ;
+				}else{
+					mqVo.svUserPassword = mqVo.svUserPassword.trim() ;
+				}
+				mqVo.poolMaxSize = conf.getSetAttrPlusInt(doc, "config.mqtt", "poolMaxSize", null, 5, 360000, null);
+				if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){
+					throw new Exception("config.mqtt.poolMaxSize閰嶇疆鐨勮繛鎺ユ睜杩炴帴鏈�澶ф暟閲忎笉鍚堟硶") ;
+				}
+				String topicAndQos = conf.getSetAttrTxt(doc, "config.mqtt", "topicAndQos", null, true, null) ;
+				if(topicAndQos == null || topicAndQos.trim().equals("")){
+					throw new Exception("config.mqtt.topicAndQos閰嶇疆鐨勪富棰樺強Qos涓嶅悎娉�") ;
+				}else{
+					topicAndQos = topicAndQos.trim() ;
+					topicAndQos = topicAndQos.replaceAll("锛�", ",");
+					topicAndQos = topicAndQos.replaceAll("锛�", ";");
+					String[] topicAndQosArr = topicAndQos.split(";") ;
+					mqVo.subTopics = new String[topicAndQosArr.length] ;
+					mqVo.topicsQos = new int[topicAndQosArr.length] ;
+					int index = 0 ;
+					for(String topicAndQosStr : topicAndQosArr){
+						String[] tq = topicAndQosStr.split(",") ;
+						mqVo.subTopics[index] = tq[0].trim() ;
+						mqVo.topicsQos[index] = Integer.parseInt(tq[1].trim()) ;
+						index++ ;
+					}
+				}
+				mqVo.publishQos = conf.getSetAttrPlusInt(doc, "config.mqtt", "publishQos", null, 0, 3, null);
+				if(mqVo.publishQos < 0 || mqVo.publishQos > 3){
+					throw new Exception("config.mqtt.publishQos閰嶇疆涓嶅悎娉�") ;
+				}
+				mqVo.showStartInfo = showStartInfo ;
+				AdapterImp_MqttUnit mqAdapt = new AdapterImp_MqttUnit();
+				mqAdapt.setConfig(mqVo);
+				MqttUnit mqUnit = MqttUnit.getInstance();
+				mqUnit.setAdapter(mqAdapt);
+				mqUnit.start(obj -> {
+				});
+				units.add(mqUnit) ;
+			}
+
 		} catch (Exception e) {
 			e.printStackTrace();
 		}		

--
Gitblit v1.8.0