From f8b2e59a82702a790c383a8ecd90c708c76e2488 Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期四, 05 六月 2025 17:51:59 +0800
Subject: [PATCH] 增量开发MQTT协议、功能模块,上下行命令(消息)等

---
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java |  126 +++++++++++++++++++++++++++++++++---------
 1 files changed, 99 insertions(+), 27 deletions(-)

diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
index 2a405c4..b7917f5 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
@@ -4,13 +4,15 @@
 import java.util.List;
 
 import com.dy.common.util.ConfigProperties;
+import com.dy.common.util.IPUtils;
 import com.dy.rtuMw.server.*;
+import com.dy.rtuMw.server.mqtt.MqttUnit;
+import com.dy.rtuMw.server.mqtt.MqttUnitConfigVo;
 import com.dy.rtuMw.server.msCenter.MsCenterConfigVo;
 import com.dy.rtuMw.server.msCenter.MsCenterUnit;
 import com.dy.rtuMw.server.rtuData.RtuDataUnit;
 import com.dy.rtuMw.server.rtuData.RtuDataUnitConfigVo;
-import com.dy.rtuMw.server.tasks.FromRtuComResultConstantTask;
-import com.dy.rtuMw.server.tasks.FromRtuDataConstantTask;
+import com.dy.rtuMw.server.tasks.*;
 import com.dy.common.mw.UnitInterface;
 import com.dy.common.mw.channel.tcp.TcpConfigVo;
 import com.dy.common.mw.channel.tcp.TcpUnit;
@@ -20,8 +22,6 @@
 import com.dy.common.mw.protocol.ProtocolUnit;
 import com.dy.common.mw.support.SupportUnit;
 import com.dy.common.mw.support.SupportUnitConfigVo;
-import com.dy.rtuMw.server.tasks.SendMsConstantTask;
-import com.dy.rtuMw.server.tasks.RtuDownConstantTask;
 import com.dy.rtuMw.resource.ResourceUnit;
 import com.dy.rtuMw.resource.ResourceUnitConfigVo;
 import com.dy.common.springUtil.SpringContextUtil;
@@ -359,28 +359,31 @@
 			//RTU杩滅▼鍗囩骇妯″潡
 			UpgradeUnitConfigVo ugVo = new UpgradeUnitConfigVo();
 			ugVo.enable = conf.getSetAttrBoolean(doc, "config.upgrade", "enable", null, null) ;
-			ugVo.openNoUpgrade = conf.getSetAttrBoolean(doc, "config.upgrade", "openNoUpgrade", null, null) ;
-			ugVo.lastOpenMaxGoOn = conf.getSetAttrPlusInt(doc, "config.upgrade", "lastOpenMaxGoOn", null, 5, 360000, null);
-			ugVo.lastOpenMaxGoOn = ugVo.lastOpenMaxGoOn * 1000 ;//鍙樻垚姣
-			ugVo.noOneRtuUpgradeMaxDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "noOneRtuUpgradeMaxDuration", null, 5, 360000, null);
-			ugVo.noOneRtuUpgradeMaxDuration = ugVo.noOneRtuUpgradeMaxDuration * 1000 ;//鍙樻垚姣
-			ugVo.runningAndIdleDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "runningAndIdleDuration", null, 5, 360000, null);
-			ugVo.runningAndIdleDuration = ugVo.runningAndIdleDuration * 1000 ;//鍙樻垚姣
-			ugVo.failTryTimes = conf.getSetAttrPlusInt(doc, "config.upgrade", "failTryTimes", null, 0, 100, null);
-			ugVo.ugMaxRtuAtOnce = conf.getSetAttrPlusInt(doc, "config.upgrade", "ugMaxRtuAtOnce", null, 0, 1000000, null);
-			ugVo.rtuOffLineWaitDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "rtuOffLineWaitDuration", null, 1, 3600000, null);
-			ugVo.rtuOffLineWaitDuration = ugVo.rtuOffLineWaitDuration * 1000;//鍙樻垚姣
-			ugVo.notifyStateInterval = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyStateInterval", null, 1, 300, null);
-			ugVo.notifyStateInterval = ugVo.notifyStateInterval * 1000;//鍙樻垚姣
-			ugVo.notifyTimesAfterOver = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyTimesAfterOver", null, 0, null, null);
-			ugVo.showStartInfo = showStartInfo ;
-			AdapterImp_UpgradeUnit ugAdap = new AdapterImp_UpgradeUnit();
-			ugAdap.setConfig(ugVo);
-			UpgradeUnit ugUnit = UpgradeUnit.getInstance();
-			ugUnit.setAdapter(ugAdap);
-			ugUnit.start(obj -> {
-			});
-			units.add(ugUnit) ;
+			if(ugVo.enable){
+				ugVo.openNoUpgrade = conf.getSetAttrBoolean(doc, "config.upgrade", "openNoUpgrade", null, null) ;
+				ugVo.lastOpenMaxGoOn = conf.getSetAttrPlusInt(doc, "config.upgrade", "lastOpenMaxGoOn", null, 5, 360000, null);
+				ugVo.lastOpenMaxGoOn = ugVo.lastOpenMaxGoOn * 1000 ;//鍙樻垚姣
+				ugVo.noOneRtuUpgradeMaxDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "noOneRtuUpgradeMaxDuration", null, 5, 360000, null);
+				ugVo.noOneRtuUpgradeMaxDuration = ugVo.noOneRtuUpgradeMaxDuration * 1000 ;//鍙樻垚姣
+				ugVo.runningAndIdleDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "runningAndIdleDuration", null, 5, 360000, null);
+				ugVo.runningAndIdleDuration = ugVo.runningAndIdleDuration * 1000 ;//鍙樻垚姣
+				ugVo.failTryTimes = conf.getSetAttrPlusInt(doc, "config.upgrade", "failTryTimes", null, 0, 100, null);
+				ugVo.ugMaxRtuAtOnce = conf.getSetAttrPlusInt(doc, "config.upgrade", "ugMaxRtuAtOnce", null, 0, 1000000, null);
+				ugVo.rtuOffLineWaitDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "rtuOffLineWaitDuration", null, 1, 3600000, null);
+				ugVo.rtuOffLineWaitDuration = ugVo.rtuOffLineWaitDuration * 1000;//鍙樻垚姣
+				ugVo.notifyStateInterval = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyStateInterval", null, 1, 300, null);
+				ugVo.notifyStateInterval = ugVo.notifyStateInterval * 1000;//鍙樻垚姣
+				ugVo.notifyTimesAfterOver = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyTimesAfterOver", null, 0, null, null);
+				ugVo.showStartInfo = showStartInfo ;
+				AdapterImp_UpgradeUnit ugAdap = new AdapterImp_UpgradeUnit();
+				ugAdap.setConfig(ugVo);
+				UpgradeUnit ugUnit = UpgradeUnit.getInstance();
+				ugUnit.setAdapter(ugAdap);
+				ugUnit.start(obj -> {
+				});
+				units.add(ugUnit) ;
+			}
+
 
 			/////////////////
 			//RTU涓婅鏁版嵁澶勭悊妯″潡锛堜换鍔℃爲锛�
@@ -410,6 +413,12 @@
 			CoreUnit.addConstantTask(new RtuDownConstantTask());
 			CoreUnit.addConstantTask(new FromRtuDataConstantTask());
 			CoreUnit.addConstantTask(new FromRtuComResultConstantTask());
+			Boolean enableMq = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ;
+			if(enableMq != null && enableMq.booleanValue()){
+				CoreUnit.addConstantTask(new MqttSubMessageConstantTask());
+				CoreUnit.addConstantTask(new MqttPubMessageConstantTask());
+				CoreUnit.addConstantTask(new MqttComResultConstantTask());
+			}
 			CoreUnit.addConstantTask(new SendMsConstantTask());
 			coreUnit.start(obj -> {
 			});
@@ -433,7 +442,70 @@
 				});
 				TcpSvUrl = "[ip]:" + tcpVo.port ;
 				units.add(tcpUnit) ;
-			}	
+			}
+
+			/////////////////
+			//MQTT妯″潡
+			MqttUnitConfigVo mqVo = new MqttUnitConfigVo();
+			mqVo.enable = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ;
+			ServerProperties.mqttUnitEnable = mqVo.enable ;
+			if(mqVo.enable){
+				mqVo.svIp = conf.getSetAttrTxt(doc, "config.mqtt", "svIp", null, true, null) ;
+				if(!IPUtils.ipValid(mqVo.svIp)){
+					throw new Exception("config.mqtt.svIp閰嶇疆鐨処P涓嶅悎娉�") ;
+				}
+				mqVo.svPort = conf.getSetAttrPlusInt(doc, "config.mqtt", "svPort", null, 5, 360000, null);
+				if(mqVo.svPort < 0 || mqVo.svPort > 65535){
+					throw new Exception("config.mqtt.svPort閰嶇疆鐨勭鍙d笉鍚堟硶") ;
+				}
+				mqVo.svUserName = conf.getSetAttrTxt(doc, "config.mqtt", "svUserName", null, true, null) ;
+				if(mqVo.svUserName == null || mqVo.svUserName.trim().equals("")){
+					throw new Exception("config.mqtt.svUserName閰嶇疆鐨勭敤鎴峰悕涓嶅悎娉�") ;
+				}else{
+					mqVo.svUserName = mqVo.svUserName.trim() ;
+				}
+				mqVo.svUserPassword = conf.getSetAttrTxt(doc, "config.mqtt", "svUserPassword", null, true, null) ;
+				if(mqVo.svUserPassword == null || mqVo.svUserPassword.trim().equals("")){
+					throw new Exception("config.mqtt.svUserName閰嶇疆鐨勭敤鎴峰瘑鐮佷笉鍚堟硶") ;
+				}else{
+					mqVo.svUserPassword = mqVo.svUserPassword.trim() ;
+				}
+				mqVo.poolMaxSize = conf.getSetAttrPlusInt(doc, "config.mqtt", "poolMaxSize", null, 5, 360000, null);
+				if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){
+					throw new Exception("config.mqtt.poolMaxSize閰嶇疆鐨勮繛鎺ユ睜杩炴帴鏈�澶ф暟閲忎笉鍚堟硶") ;
+				}
+				String topicAndQos = conf.getSetAttrTxt(doc, "config.mqtt", "topicAndQos", null, true, null) ;
+				if(topicAndQos == null || topicAndQos.trim().equals("")){
+					throw new Exception("config.mqtt.topicAndQos閰嶇疆鐨勪富棰樺強Qos涓嶅悎娉�") ;
+				}else{
+					topicAndQos = topicAndQos.trim() ;
+					topicAndQos = topicAndQos.replaceAll("锛�", ",");
+					topicAndQos = topicAndQos.replaceAll("锛�", ";");
+					String[] topicAndQosArr = topicAndQos.split(";") ;
+					mqVo.subTopics = new String[topicAndQosArr.length] ;
+					mqVo.topicsQos = new int[topicAndQosArr.length] ;
+					int index = 0 ;
+					for(String topicAndQosStr : topicAndQosArr){
+						String[] tq = topicAndQosStr.split(",") ;
+						mqVo.subTopics[index] = tq[0].trim() ;
+						mqVo.topicsQos[index] = Integer.parseInt(tq[1].trim()) ;
+						index++ ;
+					}
+				}
+				mqVo.publishQos = conf.getSetAttrPlusInt(doc, "config.mqtt", "publishQos", null, 0, 3, null);
+				if(mqVo.publishQos < 0 || mqVo.publishQos > 3){
+					throw new Exception("config.mqtt.publishQos閰嶇疆涓嶅悎娉�") ;
+				}
+				mqVo.showStartInfo = showStartInfo ;
+				AdapterImp_MqttUnit mqAdapt = new AdapterImp_MqttUnit();
+				mqAdapt.setConfig(mqVo);
+				MqttUnit mqUnit = MqttUnit.getInstance();
+				mqUnit.setAdapter(mqAdapt);
+				mqUnit.start(obj -> {
+				});
+				units.add(mqUnit) ;
+			}
+
 		} catch (Exception e) {
 			e.printStackTrace();
 		}		

--
Gitblit v1.8.0