From 21c080aa9da3acd53e014e8f917b50a48cb791cb Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期三, 11 六月 2025 13:44:04 +0800
Subject: [PATCH] 进行ApiFox发送内部命令测试,MQTTX发布气象数据测试,修改测试中发现的bug,修改不完善的地方。

---
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java      |   35 +++++---
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java           |    3 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java                    |   12 +++
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java        |   24 +++---
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java          |    2 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java |   15 +++
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java         |    2 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java                               |    1 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml                                        |    2 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java |    4 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java                |    4 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java                   |    4 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java                   |    2 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java              |   21 +++++
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java                          |    2 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java     |   30 ++++--
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java         |   16 ++--
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java      |    4 +
 18 files changed, 122 insertions(+), 61 deletions(-)

diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java
index 1d6d60f..23ec867 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java
@@ -13,8 +13,8 @@
 
     private final GenericObjectPool<MqttClient> pool;
 
-    public MqttClientPool(String broker, String username, String password, int maxConnections) {
-        MqttClientPooledObjectFactory factory = new MqttClientPooledObjectFactory(broker, username, password);
+    public MqttClientPool(String broker, String username, String password, int maxConnections, boolean useMemoryPersistence) {
+        MqttClientPooledObjectFactory factory = new MqttClientPooledObjectFactory(broker, username, password, useMemoryPersistence);
         GenericObjectPoolConfig<MqttClient> config = new GenericObjectPoolConfig<>();
         config.setMaxTotal(maxConnections);
         config.setMaxIdle(maxConnections);
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java
index bd2eb0f..86fc682 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java
@@ -5,6 +5,7 @@
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
 /**
  * @Author: liurunyu
@@ -16,18 +17,26 @@
     private final String broker;
     private final String username;
     private final String password;
+    private final Boolean useMemoryPersistence;
 
-    public MqttClientPooledObjectFactory(String broker, String username, String password) {
+    public MqttClientPooledObjectFactory(String broker, String username, String password, boolean useMemoryPersistence) {
         this.broker = broker;
         this.username = username;
         this.password = password;
+        this.useMemoryPersistence = useMemoryPersistence;
     }
 
     @Override
     public MqttClient create() throws Exception {
         String clientId = MqttClient.generateClientId();
-        MqttClient client = new MqttClient(broker, clientId);
-
+        MqttClient client = null ;
+        // 浣跨敤鍐呭瓨鎸佷箙鍖栬�岄潪榛樿鐨勬枃浠舵寔涔呭寲
+        if (useMemoryPersistence) {
+            MemoryPersistence persistence = new MemoryPersistence();
+            client = new MqttClient(broker, clientId, persistence);
+        }else{
+            client = new MqttClient(broker, clientId);
+        }
         MqttConnectOptions options = new MqttConnectOptions();
         options.setUserName(username);
         options.setPassword(password.toCharArray());
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java
index 2611e24..560067f 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java
@@ -34,7 +34,7 @@
     public static void main(String[] args) {
         try{
             // 鍒濆鍖栬繛鎺ユ睜
-            pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections);
+            pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections, true);
             MqttClient clientSub = pool.popClient() ;
             testSubscribe(clientSub, topic1);
             testSubscribe(clientSub, topic2);
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
index bc144f2..8226de7 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
@@ -15,8 +15,8 @@
     public String deviceId ;//璁惧ID
     public String protocol;//鍗忚
 
-    public String topic ;//娑堟伅涓婚
-    public String msg ;//娑堟伅
+    public MqttTopic topic ;//娑堟伅涓婚
+    public String metaData;//MQTT鎺ㄩ�佹潵鐨勫厓鏁版嵁
 
     public abstract boolean valid();
 
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
index 8403a73..67b1245 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
@@ -18,4 +18,16 @@
     public String devId ;//璁惧锛團Box锛塈D
     public String topic ;//娑堟伅涓婚
 
+    public boolean isEmpty(){
+        return orgTag == null || protocol == null || devId == null || topic == null
+                || orgTag.trim().length() == 0 || protocol.trim().length() == 0 || devId.trim().length() == 0 || topic.trim().length() == 0 ;
+    }
+
+    public String shortName(){
+        return topic ;
+    }
+
+    public String longName(){
+        return orgTag + "/" + protocol + "/" + devId + "/" + topic ;
+    }
 }
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
index 001b8c3..9615da3 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
@@ -24,8 +24,8 @@
     public MqttSubMsgSdV1(MqttTopic subTopic, String msg) {
         this.deviceId = subTopic.devId ;
         this.protocol = subTopic.protocol ;
-        this.topic = subTopic.topic ;
-        this.msg = msg ;
+        this.topic = subTopic ;
+        this.metaData = msg ;
     }
     public String toString(){
         StringBuilder sb = new StringBuilder();
@@ -35,17 +35,17 @@
                     .append("\n") ;
         }
         sb.append("涓婚:")
-                .append(topic)
-                .append("\n") ;
-        sb.append("娑堟伅:")
-                .append(msg)
+                .append(topic.longName())
                 .append("\n") ;
         if(vo4Up != null){
             sb.append("鏁版嵁:")
                     .append(vo4Up.toString())
                     .append("\n") ;
+        }else{
+            sb.append("鍏冩暟鎹�:")
+                    .append(metaData)
+                    .append("\n") ;
         }
-
         return sb.toString() ;
     }
 
@@ -64,7 +64,7 @@
         if (topic == null || topic.isEmpty()) {
             return false;
         }
-        if (msg == null || msg.isEmpty()) {
+        if (metaData == null || metaData.isEmpty()) {
             return false;
         }
         return true;
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
index df625b1..527e6bc 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
@@ -12,6 +12,7 @@
 import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.InjectStartVo;
 import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.StirStartVo;
 import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.*;
+import com.dy.common.mw.protocol4Mqtt.status.DevRunSt;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 
 /**
@@ -21,38 +22,45 @@
  */
 public class ProtocolParserSdV1 {
     public MqttSubMsgSdV1 parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception {
-        String msg = new String(mqttMsg.getPayload(), "UTF-8");
-        if(JSON.isValid(msg)){
-            throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.topic + "娑堟伅鏍煎紡闈瀓son鏁版嵁(" + msg + ")") ;
+        String strTxt = new String(mqttMsg.getPayload(), "UTF-8");
+        if(!JSON.isValid(strTxt)){
+            throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.longName() + "娑堟伅鏍煎紡闈瀓son鏁版嵁(" + strTxt + ")") ;
         }
-        MqttSubMsgSdV1 ms = new MqttSubMsgSdV1(subTopic, msg);
+        MqttSubMsgSdV1 msg = new MqttSubMsgSdV1(subTopic, strTxt);
         Vo4Up vo ;
+        DevRunSt stVo ;
         switch (subTopic.topic) {
             case ProtocolConstantSdV1.SubTopicWeather -> {
-                vo = JSON.parseObject(msg, WeatherVo.class);
+                vo = JSON.parseObject(strTxt, WeatherVo.class);
                 break;
             }
             case ProtocolConstantSdV1.SubTopicSoil -> {
-                vo = JSON.parseObject(msg, SoilVo.class);
+                vo = JSON.parseObject(strTxt, SoilVo.class);
                 break;
             }
             case ProtocolConstantSdV1.SubTopicManure -> {
-                vo = JSON.parseObject(msg, ManureVo.class);
+                vo = JSON.parseObject(strTxt, ManureVo.class);
                 break;
             }
             case ProtocolConstantSdV1.SubTopicState -> {
                 //姝ゅ鏈畬鎴愶紝搴旇浜х敓涓�浜涢�氫俊鐨刬nfo锛屼緵涓嬮潰callback.notify(objs)閫氱煡鍑哄幓
-                vo = JSON.parseObject(msg, StateVo.class);
+                vo = JSON.parseObject(strTxt, StateVo.class);
+                stVo = new DevRunSt() ;
+                stVo.id = msg.deviceId ;
+                //stVo.stirRunning = true ; //鎼呮媽杩愯 true鏄� false鍚�
+                //stVo.injectRunning = true ; //娉ㄨ偉杩愯 true鏄� false鍚�
+                //stVo.irrRunning = true ; //鐏屾簤杩愯 true鏄� false鍚�
+                //stVo.alarm = true ; //鎶ヨ true鏄� false鍚�
                 break;
             }
             default -> {
                 throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.topic + "娑堟伅瑙f瀽閫昏緫鏈疄鐜�");
             }
         }
-        ms.vo4Up = vo ;
-        callback.callback(ms);
+        msg.vo4Up = vo ;
+        callback.callback(msg);
         callback.notify(null);//姝ゅ鏈畬鎴�
-        return ms;
+        return msg;
     }
 
     public MqttPubMsgSdV1 createPubMsg(String orgTag, Command com) throws Exception {
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
index 6ebc8be..62ba401 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
@@ -43,7 +43,7 @@
 
     public String devDtStr ;//璁惧鏃堕棿
     public String getDevDtStr() {
-        if(devDt == null){
+        if(devDt != null){
             return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ;
         }else{
             return "" ;
@@ -53,17 +53,17 @@
     @Override
     public String toString(){
         StringBuilder sb = new StringBuilder();
-        sb.append("姘旇薄鏁版嵁锛�") ;
-        sb.append(" 娑堟伅ID锛�"+messageId) ;
-        sb.append(" 浜屾哀鍖栫⒊锛�"+carbonDioxide) ;
-        sb.append(" 鍏夌収寮哄害锛�"+lightIntensity) ;
-        sb.append(" 澶ф皵鍘嬪姏锛�"+atmosphericPressure) ;
-        sb.append(" 绌烘皵娓╁害锛�"+airTemperature) ;
-        sb.append(" 绌烘皵婀垮害锛�"+airHumidity) ;
-        sb.append(" PM2.5锛�"+pm25) ;
-        sb.append(" PM10锛�"+pm10) ;
-        sb.append(" 璁惧鏃堕棿锛�"+devDt) ;
-        sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ;
+        sb.append("姘旇薄鏁版嵁=>") ;
+        sb.append(" 娑堟伅ID锛�"+messageId + ", ") ;
+        sb.append(" 浜屾哀鍖栫⒊锛�"+carbonDioxide + ", ") ;
+        sb.append(" 鍏夌収寮哄害锛�"+lightIntensity + ", ") ;
+        sb.append(" 澶ф皵鍘嬪姏锛�"+atmosphericPressure + ", ") ;
+        sb.append(" 绌烘皵娓╁害锛�"+airTemperature + ", ") ;
+        sb.append(" 绌烘皵婀垮害锛�"+airHumidity + ", ") ;
+        sb.append(" PM2.5锛�"+pm25 + ", ") ;
+        sb.append(" PM10锛�"+pm10 + ", ") ;
+        sb.append(" 璁惧鏃堕棿锛�"+devDt + ", ") ;
+        sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr() + ", ") ;
         sb.append("\n") ;
         return sb.toString() ;
     }
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
index 0cf1c24..e823bbd 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
@@ -474,6 +474,7 @@
 				if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){
 					throw new Exception("config.mqtt.poolMaxSize閰嶇疆鐨勮繛鎺ユ睜杩炴帴鏈�澶ф暟閲忎笉鍚堟硶") ;
 				}
+				mqVo.useMemoryPersistence = conf.getSetAttrBoolean(doc, "config.mqtt", "useMemoryPersistence", null, null) ;
 				String proAndDevIds = conf.getSetAttrTxt(doc, "config.mqtt", "protocolAndDeviceIds", null, false, null) ;
 				if(proAndDevIds == null || proAndDevIds.trim().equals("")){
 					throw new Exception("config.mqtt.protocolAndDeviceIds閰嶇疆涓嶅悎娉�") ;
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java
index 5171b77..95a6248 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java
@@ -32,7 +32,7 @@
                 com2.code = CodeLocal.stopMqttSv ;
                 com2.type = CommandType.innerCommand ;
                 new CommandInnerDeaLer().deal(com2) ;
-
+                log.info("鍏抽棴绋嬪簭鍓嶏紝鍏抽棴浜哅QTT鏈嶅姟");
             }catch (Exception e){
                 log.error("绋嬪簭锛堟帶鍒跺彴锛夊叧闂挬瀛愬彂鐢熷紓甯�", e);
             }
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
index 6b4d081..0e2375b 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
@@ -113,6 +113,10 @@
                 rCom = this.stopMqttSv(com);
                 break;
             }
+            case CodeLocal.recoverMqttSv -> {
+                rCom = this.recoverMqttSv(com);
+                break;
+            }
             default -> {
                 rCom = ReturnCommand.errored("鍑洪敊锛屾敹鍒板唴閮ㄥ懡浠ょ殑鍔熻兘鐮佷笉鑳借瘑鍒紒", com.getId(), com.getCode());
                 break;
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java
index 3a80412..676852f 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java
@@ -19,7 +19,6 @@
      * @param message
      */
     public static Command successed(String message, String commandId, String code, Object attachment) {
-        log.info(message);
         Command command = new Command().createReturnSuccessCommand(message, commandId, code);
         command.setAttachment(attachment);
         return command;
@@ -29,7 +28,6 @@
      * @param message
      */
     public static Command successed(String message, String commandId, String code) {
-        log.info(message);
         return new Command().createReturnSuccessCommand(message, commandId, code);
     }
     /**
@@ -37,7 +35,6 @@
      * @param message
      */
     public static Command errored(String message, String commandId, String code) {
-        log.error(message);
         return new Command().createReturnErrorCommand(message, commandId, code);
     }
 }
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
index e1a0417..56096ba 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
@@ -38,7 +38,7 @@
 
 	public static final String onPartLineMqtt = "LMCD0002" ;//鏌ヨ閮ㄥ垎MQTT璁惧鍦ㄧ嚎鎯呭喌
 
-	public static final String onLineStatisticsMqtt = "LMCD0003" ;//鏌ヨ鎵�鏈塎QTT璁惧鐘舵�佺粺璁℃儏鍐�
+	public static final String onLineStatisticsMqtt = "LMCD0003" ;//鏌ヨ鎵�鏈塎QTT璁惧鍦ㄧ嚎鐘舵�佺粺璁℃儏鍐�
 
 	public static final String allRtuStatesMqtt = "LMCD0010" ;//鏌ヨ鎵�鏈塎QTT璁惧鐘舵��
 
@@ -48,5 +48,7 @@
 
 	public static final String stopMqttSv = "LMCD0110" ;//鍋滄Mqtt鏈嶅姟
 
+	public static final String recoverMqttSv = "LMCD0112" ;//閲嶅惎MQTT鏈嶅姟锛屾帴鍏ユ柊鐨凪QTT杩炴帴
+
 
 }
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
index c164c0a..099cc9e 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
@@ -55,7 +55,7 @@
             Map.Entry<String, DevStatus> entry = null ;
             while(it.hasNext()){
                 entry = it.next() ;
-                if(((DevStatus)entry).onLine != null && ((DevStatus)entry).onLine.booleanValue()){
+                if((entry.getValue()).onLine != null && (entry.getValue()).onLine.booleanValue()){
                     vo.onLineNum++ ;
                 }else{
                     vo.offLineNum++ ;
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
index ac3e211..661696d 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
@@ -10,6 +10,9 @@
 import org.apache.logging.log4j.Logger;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * @Author: liurunyu
  * @Date: 2025/6/4 14:54
@@ -24,6 +27,8 @@
     private MqttUnitConfigVo configVo ;
 
     private MqttClientPool pool;
+
+    private List<MqttClient> subClients ;
 
     private MqttManager(){
     }
@@ -43,8 +48,9 @@
      * @throws Exception
      */
     public void start()throws Exception{
+        subClients = new ArrayList<>();
         String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort;
-        this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize);
+        this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize, this.configVo.useMemoryPersistence);
         if(this.pool.isClose()){
             throw new Exception("Mqtt杩炴帴姹犲垵濮嬪寲澶辫触");
         }
@@ -57,6 +63,7 @@
         if(clientSub == null || !clientSub.isConnected()){
             throw new Exception("Mqtt杩炴帴姹犺幏寰楄闃呰繛鎺ヤ笉鍙敤");
         }
+        subClients.add(clientSub) ;
         // 璁㈤槄涓婚
         for(int i = 0; i < this.configVo.subTopics.length; i++){
             for(int j = 0 ; j < this.configVo.protocolAndDeviceIds.length; j++){
@@ -90,6 +97,18 @@
     }
 
     public void stop()throws Exception{
+        if(subClients != null && subClients.size() > 0){
+            for (MqttClient client : subClients) {
+                if(client != null && client.isConnected()){
+                    try{
+                        client.disconnect();
+                        client.close();
+                    }catch (Exception e){
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
         if(this.pool != null){
             // 鍏抽棴杩炴帴姹�
             this.pool.close();
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
index 7d8c6ea..0414628 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
@@ -22,22 +22,27 @@
 
     @Override
     public void messageArrived(String topic, MqttMessage msg) throws Exception {
-        MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic) ;
-        MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback(){
-            @Override
-            public void callback(MqttSubMsg subMsg) {
-                DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol);
-                DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId);
-                RtuLogDealer.log4Mqtt(subMsg.deviceId, "璁㈤槄娑堟伅    涓婚锛�" + subMsg.topic + "   娑堟伅锛�" + subMsg.msg);
-            }
-            @Override
-            public void notify(String devId, MqttNotifyInfo... infos) {
-                if(notify != null){
-                    notify.notify(devId, infos) ;
+        try {
+            MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic);
+            MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback() {
+                @Override
+                public void callback(MqttSubMsg subMsg) {
+                    DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol);
+                    DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId);
+                    RtuLogDealer.log4Mqtt(subMsg.deviceId, "璁㈤槄娑堟伅    涓婚锛�" + subMsg.topic.longName() + "   鍏冩暟鎹細" + subMsg.metaData);
                 }
-            }
-        }) ;
-        this.nextDeal(subMsg);
+
+                @Override
+                public void notify(String devId, MqttNotifyInfo... infos) {
+                    if (notify != null) {
+                        notify.notify(devId, infos);
+                    }
+                }
+            });
+            this.nextDeal(subMsg);
+        }catch(Exception e){
+            log.error("澶勭悊MQTT璁㈤槄娑堟伅鍙戠敓寮傚父", e);
+        }
     }
     private void nextDeal(MqttSubMsg subMsg)throws Exception {
         subMsg.action(new Callback() {
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
index f96f189..e2cad34 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
@@ -13,6 +13,7 @@
     public String svUserName ;//
     public String svUserPassword ;//
     public Integer poolMaxSize ;//
+    public Boolean useMemoryPersistence ;
     public String[] protocolAndDeviceIds ;//璁惧鍗忚涓嶪D锛團Box锛塱d
     public String[] deviceIds ;//璁惧锛團Box锛塱d
     public String[] subTopics ;//璁㈤槄鐨勪富棰�
@@ -27,6 +28,7 @@
         this.svUserName = "dyyjy" ;
         this.svUserPassword = "Dyyjy2025,;.abc!@#" ;
         this.poolMaxSize = 10 ;
+        useMemoryPersistence = true ;
         this.pubTopicQos = 1 ;
         this.noSubThenOff = 10 * 60 * 10000L ;
     }
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
index 3d3e466..50e5d2c 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
@@ -171,6 +171,7 @@
 	svUserName MQTT鏈嶅姟鍣ㄧ敤鎴峰悕
 	svUserPassword MQTT鏈嶅姟鍣ㄧ敤鎴峰瘑鐮�
 	poolMaxSize 杩炴帴姹犳渶澶ц繛鎺ユ暟
+	useMemoryPersistence 浣跨敤鍐呭瓨鎸佷箙鍖栬�岄潪榛樿鐨勬枃浠舵寔涔呭寲(true鏄� false鍚�)
 	protocolAndDeviceIds 鍦ㄥ瓙绯荤粺锛坥rgTag锛変腑鎺ュ叆鐨勮澶�(FBox)鎵�鐢ㄥ崗璁強璁惧id闆嗗悎,澶氫釜鐢ㄩ�楀彿闅斿紑锛屽崗璁笌ID鐢ㄦ鏂滄潬闅斿紑锛屼緥濡傦細sd1/338220031439,sd1/338220031440
 	subTopicAndQos: 璁㈤槄涓婚涓嶲os锛屼富棰樺悕涓庡叾Qos鐢ㄩ�楀彿闅斿紑锛屽涓富棰樺強Qos鐢ㄥ垎鍙烽殧寮�锛屼緥濡傦細ym/topic1,1;ym/topic2,1;ym/topic3,1锛屽鏋滄湁澶氫釜OrgTag锛屼富棰樺墠缂�鐢ㄥ叾OrgTag
 	pubTopicQos: 鍙戝竷涓婚鐨凲os锛屽彇鍊艰寖鍥达細
@@ -185,6 +186,7 @@
 		  svUserName="dyyjy"
 		  svUserPassword="Dyyjy2025,;.abc!@#"
 		  poolMaxSize="10"
+		  useMemoryPersistence="true"
 		  protocolAndDeviceIds="${mqtt.protocolAndDeviceIds}"
 		  subTopicAndQos="${mqtt.subTopicAndQos}"
 		  pubTopicQos="1"

--
Gitblit v1.8.0