From 10a0b0ca34824307aa7d23b0ad6679b36bd57842 Mon Sep 17 00:00:00 2001
From: zhubaomin <zhubaomin>
Date: 星期二, 10 六月 2025 19:59:49 +0800
Subject: [PATCH] Merge branch 'master' of http://8.140.179.55:20000/r/pipIrr-SV

---
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java         |   35 +
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java                   |   31 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java            |    3 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/RtuLogDealer.java              |   18 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java        |   24 +
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java            |   36 +
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatus.java                   |   23 +
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java            |   10 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Up.java                           |   10 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java        |    2 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java                                  |   65 +
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java              |   16 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java      |   25 +
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java                      |    4 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java             |   36 +
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/p206V202404/parse/Cd_84_Up.java           |   11 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/说明.txt                         |    4 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java                 |   18 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/p206V202404/parse/Cd_85_Up.java           |   11 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java       |   24 +
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/softUpgrade/state/UpgradeRtu.java                     |   37 -
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java |   13 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java                       |   21 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java           |   70 +++
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java             |  186 ++++++++
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotify.java                      |   16 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java      |   13 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties                                    |   32 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java              |   36 +
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCom.java                         |    2 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Down.java                         |   10 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java     |   24 +
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml                                           |   17 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java    |   31 +
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotifyInfo.java                  |    9 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCallback.java                    |   19 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java                 |   44 +
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java        |  131 +++--
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java              |    3 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java            |   20 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java         |  197 +++++++-
 41 files changed, 1,140 insertions(+), 197 deletions(-)

diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/p206V202404/parse/Cd_84_Up.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/p206V202404/parse/Cd_84_Up.java
index a3cf043..73dd128 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/p206V202404/parse/Cd_84_Up.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/p206V202404/parse/Cd_84_Up.java
@@ -139,5 +139,14 @@
 
 
     }
-
+    public static void main(String[] args) throws Exception {
+        String hex = "69426981371425010201130084020AF00200000000620902003004050077289364503912943449101006255833070000000000000000000000000000000000014000009000C616";
+        byte[] bs = ByteUtil.hex2Bytes(hex) ;
+        Cd_84_Up p = new Cd_84_Up() ;
+        Data d = new Data() ;
+        DataV202404 subd = new DataV202404() ;
+        d.subData = subd ;
+        p.doParse(bs, bs.length, "84", d);
+        System.out.println(d.toString());
+    }
 }
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/p206V202404/parse/Cd_85_Up.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/p206V202404/parse/Cd_85_Up.java
index a328e05..b269d95 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/p206V202404/parse/Cd_85_Up.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/p206V202404/parse/Cd_85_Up.java
@@ -99,4 +99,15 @@
         GlParse.parseCd85(bs, cdData) ;
     }
 
+    public static void main(String[] args) throws Exception {
+        String hex = "695569813714250102011300850200090000000062090200300405007728936450391294344910100625485510100625683907000000000000000000001000000000000000000000100600000000000000000140000090004916";
+        byte[] bs = ByteUtil.hex2Bytes(hex) ;
+        Cd_85_Up p = new Cd_85_Up() ;
+        Data d = new Data() ;
+        DataV202404 subd = new DataV202404() ;
+        d.subData = subd ;
+        p.doParse(bs, bs.length, "85", d);
+        System.out.println(d.toString());
+    }
+
 }
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCallback.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCallback.java
new file mode 100644
index 0000000..35ff0ef
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCallback.java
@@ -0,0 +1,19 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 15:39
+ * @Description
+ */
+public interface MqttCallback {
+    /**
+     * @param subMsg 璁㈤槄鐨勬秷鎭�
+     */
+    void callback(MqttSubMsg subMsg) ;
+
+    /**
+     * 鍙湁鍗忚瑙f瀽鍣ㄦ墠鐭ラ亾RTU鐪熷疄鐨勭姸鎬侊紝鎵�璁ゆ彁渚涙鎺ュ彛锛屽悜澶栭�氱煡璁惧鐨勪竴浜涚姸鎬�
+     * @param infos
+     */
+    void notify(String devId, MqttNotifyInfo...infos) ;
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCom.java
similarity index 95%
rename from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java
rename to pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCom.java
index 8663e05..c371a00 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCom.java
@@ -8,7 +8,7 @@
  * @Description 鍛戒护鍊煎璞�
  */
 @Data
-public class Com4Mqtt {
+public class MqttCom {
     public String commandId ;//鍛戒护ID
 
     public String deviceId ;//璁惧ID
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java
index 15ec6b0..11cfbd6 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java
@@ -11,25 +11,38 @@
  * @Description
  */
 public class MqttMsgParser {
-    public MqttSubMsg parseSubMsg(String topic, MqttMessage mqttMsg) throws Exception {
+    public static MqttTopic parseSubTopic(String topic) throws Exception {
         if(topic != null && topic.trim().length() != 0){
             String[] topicGrp = topic.split("/") ;
             if(topicGrp.length != 4){
                 throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓嶅彲璇嗗埆") ;
             }else{
-                if(topicGrp[1].equals("sd1")){
-                    //灞变笢璁惧(鍗忚)锛屼笖鐗堟湰鍙蜂负1
-                    return new ProtocolParserSdV1().parseSubMsg(topicGrp[2], topic, mqttMsg);
-                }else{
-                    throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓崗璁紙鍘傚鍙婄増鏈級涓嶅彲璇嗗埆") ;
-                }
+                MqttTopic vo = new MqttTopic() ;
+                vo.orgTag = topicGrp[0] ;
+                vo.protocol = topicGrp[1] ;
+                vo.devId = topicGrp[2] ;
+                vo.topic = topicGrp[3] ;
+                return vo ;
             }
         }else{
-            throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓虹┖") ;
+            throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓嶅悎娉�") ;
         }
     }
 
-    public MqttPubMsg createPubMsg(String orgTag, Command com) throws Exception {
+    public static String createPubTopic(MqttTopic tp) throws Exception {
+        return tp.orgTag + "/" + tp.protocol + "/" + tp.devId + "/" + tp.topic ;
+    }
+
+    public static MqttSubMsg parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception {
+        if(subTopic.protocol.equals(ProtocolConstantSdV1.protocolName + ProtocolConstantSdV1.protocolVer)){
+            //灞变笢璁惧(鍗忚)锛屼笖鐗堟湰鍙蜂负1
+            return new ProtocolParserSdV1().parseSubMsg(subTopic, mqttMsg, callback);
+        }else{
+            throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓崗璁紙鍘傚鍙婄増鏈級涓嶅彲璇嗗埆") ;
+        }
+    }
+
+    public static MqttPubMsg createPubMsg(String orgTag, Command com) throws Exception {
         if(com.protocol == null && com.protocol.trim().length() != 0){
             throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屼絾鏈彁渚涘崗璁�") ;
         }
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotify.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotify.java
new file mode 100644
index 0000000..ca81a61
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotify.java
@@ -0,0 +1,16 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 15:56
+ * @Description
+ */
+public interface MqttNotify {
+    /**
+     * MQTT DEV 淇℃伅閫氱煡
+     * @param devId
+     * @param info
+     */
+    void notify(String devId,
+                MqttNotifyInfo...info) ;
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotifyInfo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotifyInfo.java
new file mode 100644
index 0000000..2f0e405
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotifyInfo.java
@@ -0,0 +1,9 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 15:42
+ * @Description
+ */
+public interface MqttNotifyInfo {
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
index 61b4f65..bc144f2 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
@@ -10,10 +10,10 @@
 
 public abstract class MqttSubMsg {
     public String commandId ;//鍛戒护ID
+    public String mqttResultSendWebUrl ;//Mtt杩斿洖鍛戒护缁撴灉 鍙戝悜鐩殑鍦皐eb URL
 
     public String deviceId ;//璁惧ID
-
-    public String mqttResultSendWebUrl ;//Mtt杩斿洖鍛戒护缁撴灉 鍙戝悜鐩殑鍦皐eb URL
+    public String protocol;//鍗忚
 
     public String topic ;//娑堟伅涓婚
     public String msg ;//娑堟伅
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
new file mode 100644
index 0000000..8403a73
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
@@ -0,0 +1,21 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 9:47
+ * @Description
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class MqttTopic {
+    public String orgTag ;//缁勭粐鏍囪瘑
+    public String protocol ;//鍗忚鍚嶇О
+    public String devId ;//璁惧锛團Box锛塈D
+    public String topic ;//娑堟伅涓婚
+
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Down.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Down.java
new file mode 100644
index 0000000..2997873
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Down.java
@@ -0,0 +1,10 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 14:11
+ * @Description
+ */
+public interface Vo4Down {
+    String toString() ;
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Up.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Up.java
new file mode 100644
index 0000000..2fd7a81
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Up.java
@@ -0,0 +1,10 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 10:06
+ * @Description
+ */
+public interface Vo4Up {
+    String toString() ;
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java
index 9817c1b..6193843 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java
@@ -13,8 +13,7 @@
 @EqualsAndHashCode(callSuper=false)
 public class MqttPubMsgSdV1 extends MqttPubMsg {
 
-    public Integer address ;//瀵勫瓨鍣ㄥ湴鍧�
-    public String value ;//瀵勫瓨鍣ㄥ��
+    public String cd ;//鍔熻兘鐮�
 
     @Override
     public boolean valid() {
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
index e26ff6d..001b8c3 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
@@ -2,6 +2,9 @@
 
 import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
 import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
+import com.dy.common.mw.protocol4Mqtt.MqttTopic;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.StateVo;
+import com.dy.common.mw.protocol4Mqtt.Vo4Up;
 import com.dy.common.util.Callback;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -14,14 +17,14 @@
 @Data
 @EqualsAndHashCode(callSuper=false)
 public class MqttSubMsgSdV1 extends MqttSubMsg {
-    public Integer address ;//瀵勫瓨鍣ㄥ湴鍧�
-    public String value ;//瀵勫瓨鍣ㄥ��
+    public Vo4Up vo4Up;//璁㈤槄鐨勬秷鎭暟鎹�煎璞�
 
     public MqttSubMsgSdV1(){}
 
-    public MqttSubMsgSdV1(String deviceId, String topic, String msg) {
-        this.deviceId = deviceId ;
-        this.topic = topic ;
+    public MqttSubMsgSdV1(MqttTopic subTopic, String msg) {
+        this.deviceId = subTopic.devId ;
+        this.protocol = subTopic.protocol ;
+        this.topic = subTopic.topic ;
         this.msg = msg ;
     }
     public String toString(){
@@ -37,6 +40,11 @@
         sb.append("娑堟伅:")
                 .append(msg)
                 .append("\n") ;
+        if(vo4Up != null){
+            sb.append("鏁版嵁:")
+                    .append(vo4Up.toString())
+                    .append("\n") ;
+        }
 
         return sb.toString() ;
     }
@@ -44,7 +52,7 @@
     public boolean subMsgMatchPubMsg(MqttPubMsg pubMsg){
         if (pubMsg instanceof MqttPubMsgSdV1) {
             MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg;
-            if(this.address.intValue() == pubMsgSdV1.getAddress().intValue()){
+            if(this.vo4Up != null && this.vo4Up instanceof StateVo){
                 return true ;
             }
         }
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java
index 8d7e5cd..60dc354 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java
@@ -8,4 +8,17 @@
 public class ProtocolConstantSdV1 {
     public static final String protocolName = "sd" ;
     public static final short protocolVer = 1 ;
+
+    //璁㈤槄鐨勪富棰�
+    public static final String SubTopicWeather = "weather" ;//姘旇薄
+    public static final String SubTopicSoil = "soil" ;//鍦熷¥澧掓儏
+    public static final String SubTopicManure = "manure" ;//姘磋偉
+    public static final String SubTopicState = "state" ;//鐘舵��
+
+    //鍙戝竷鐨勪富棰�
+    public static final String PubTopicFault = "ctrlFault" ;//鏁呴殰瑙i櫎
+    public static final String PubTopicStir = "ctrlStir" ;//鎼呮媽鍚仠
+    public static final String PubTopicInject = "ctrlInject" ;//娉ㄨ偉鍚仠
+    public static final String PubTopicIrr = "ctrlIrr" ;//鐏屾簤鍚仠
+
 }
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
index 1ca98b2..df625b1 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
@@ -3,9 +3,16 @@
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONObject;
 import com.dy.common.mw.protocol.Command;
-import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.ComCtrlVo;
+import com.dy.common.mw.protocol4Mqtt.MqttCallback;
+import com.dy.common.mw.protocol4Mqtt.MqttMsgParser;
+import com.dy.common.mw.protocol4Mqtt.MqttTopic;
+import com.dy.common.mw.protocol4Mqtt.Vo4Up;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.comParam.ComCtrlVo;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.FaultClearVo;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.InjectStartVo;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.StirStartVo;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.*;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
-
 
 /**
  * @Author: liurunyu
@@ -13,44 +20,74 @@
  * @Description
  */
 public class ProtocolParserSdV1 {
-    public MqttSubMsgSdV1 parseSubMsg(String deviceId, String topic, MqttMessage mqttMsg) throws Exception {
-        MqttSubMsgSdV1 ms = new MqttSubMsgSdV1(deviceId, topic, new String(mqttMsg.getPayload(), "UTF-8"));
+    public MqttSubMsgSdV1 parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception {
+        String msg = new String(mqttMsg.getPayload(), "UTF-8");
+        if(JSON.isValid(msg)){
+            throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.topic + "娑堟伅鏍煎紡闈瀓son鏁版嵁(" + msg + ")") ;
+        }
+        MqttSubMsgSdV1 ms = new MqttSubMsgSdV1(subTopic, msg);
+        Vo4Up vo ;
+        switch (subTopic.topic) {
+            case ProtocolConstantSdV1.SubTopicWeather -> {
+                vo = JSON.parseObject(msg, WeatherVo.class);
+                break;
+            }
+            case ProtocolConstantSdV1.SubTopicSoil -> {
+                vo = JSON.parseObject(msg, SoilVo.class);
+                break;
+            }
+            case ProtocolConstantSdV1.SubTopicManure -> {
+                vo = JSON.parseObject(msg, ManureVo.class);
+                break;
+            }
+            case ProtocolConstantSdV1.SubTopicState -> {
+                //姝ゅ鏈畬鎴愶紝搴旇浜х敓涓�浜涢�氫俊鐨刬nfo锛屼緵涓嬮潰callback.notify(objs)閫氱煡鍑哄幓
+                vo = JSON.parseObject(msg, StateVo.class);
+                break;
+            }
+            default -> {
+                throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.topic + "娑堟伅瑙f瀽閫昏緫鏈疄鐜�");
+            }
+        }
+        ms.vo4Up = vo ;
+        callback.callback(ms);
+        callback.notify(null);//姝ゅ鏈畬鎴�
         return ms;
     }
 
     public MqttPubMsgSdV1 createPubMsg(String orgTag, Command com) throws Exception {
-        MqttPubMsgSdV1 msg = null ;
-        switch (com.code){
-            case CodeSdV1.cd_Fault:{
+        MqttPubMsgSdV1 msg ;
+        switch (com.code) {
+            case CodeSdV1.cd_Fault -> {
                 //鏁呴殰瑙i櫎鍛戒护
                 this.checkParam(com);
                 this.checkRtnWebUrl(com);
-                msg = this.createPubMsgOfFault(orgTag, com) ;
-                break ;
+                msg = this.createPubMsgOfFault(orgTag, com);
+                break;
             }
-            case CodeSdV1.cd_Stir:{
+            case CodeSdV1.cd_Stir -> {
                 //鎼呮媽鍚仠鍛戒护
                 this.checkParam(com);
                 this.checkRtnWebUrl(com);
-                msg = this.createPubMsgOfStir(orgTag, com) ;
-                break ;
+                msg = this.createPubMsgOfStir(orgTag, com);
+                break;
             }
-            case CodeSdV1.cd_Inject:{
+            case CodeSdV1.cd_Inject -> {
                 //娉ㄨ偉鍚仠鍛戒护
                 this.checkParam(com);
                 this.checkRtnWebUrl(com);
-                msg = this.createPubMsgOfInject(orgTag, com) ;
-                break ;
+                msg = this.createPubMsgOfInject(orgTag, com);
+                break;
             }
-            case CodeSdV1.cd_Irr:{
+            case CodeSdV1.cd_Irr -> {
                 //鐏屾簤鍚仠鍛戒护
                 this.checkParam(com);
                 this.checkRtnWebUrl(com);
-                msg = this.createPubMsgOfIrr(orgTag, com) ;
-                break ;
+                msg = this.createPubMsgOfIrr(orgTag, com);
+                break;
             }
-            default:{
-                throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鐗堟湰" + com.protocolVersion + "鍔熻兘鐮�" + com.code + "鏋勯�犲櫒鏈疄鐜�") ;
+            default -> {
+                throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鐗堟湰" + com.protocolVersion + "鍔熻兘鐮�" + com.code + "鏋勯�犲櫒鏈疄鐜�");
             }
         }
         return msg ;
@@ -76,10 +113,9 @@
         this.setPubMsgBase(com, msg);
         msg.isCacheForOffLine = false ;
         msg.hasResponse = true ;
-        msg.address = 123 ;
-        msg.value = "" + (cvo.isDo?1:0);
-        msg.topic = createTopic(orgTag, com) ;
-        msg.msg = "" ;
+        msg.cd = CodeSdV1.cd_Fault ;
+        msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
+        msg.msg = JSON.toJSONString(new FaultClearVo(cvo.isDo)) ;
         return msg ;
     }
     private MqttPubMsgSdV1 createPubMsgOfStir(String orgTag, Command com) throws Exception {
@@ -93,10 +129,9 @@
         this.setPubMsgBase(com, msg);
         msg.isCacheForOffLine = false ;
         msg.hasResponse = true ;
-        msg.address = 123 ;
-        msg.value = "" + (cvo.isDo?1:0);
-        msg.topic = createTopic(orgTag, com) ;
-        msg.msg = "" ;
+        msg.cd = CodeSdV1.cd_Fault ;
+        msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
+        msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ;
         return msg ;
     }
     private MqttPubMsgSdV1 createPubMsgOfInject(String orgTag, Command com) throws Exception {
@@ -110,10 +145,9 @@
         this.setPubMsgBase(com, msg);
         msg.isCacheForOffLine = false ;
         msg.hasResponse = true ;
-        msg.address = 123 ;
-        msg.value = "" + (cvo.isDo?1:0);
-        msg.topic = createTopic(orgTag, com) ;
-        msg.msg = "" ;
+        msg.cd = CodeSdV1.cd_Fault ;
+        msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
+        msg.msg = JSON.toJSONString(new InjectStartVo(cvo.isDo)) ;
         return msg ;
     }
     private MqttPubMsgSdV1 createPubMsgOfIrr(String orgTag, Command com) throws Exception {
@@ -127,10 +161,9 @@
         this.setPubMsgBase(com, msg);
         msg.isCacheForOffLine = false ;
         msg.hasResponse = true ;
-        msg.address = 123 ;
-        msg.value = "" + (cvo.isDo?1:0);
-        msg.topic = createTopic(orgTag, com) ;
-        msg.msg = "" ;
+        msg.cd = CodeSdV1.cd_Fault ;
+        msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
+        msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ;
         return msg ;
     }
 
@@ -140,31 +173,5 @@
         msg.mqttResultSendWebUrl = com.rtuResultSendWebUrl ;
     }
 
-    private String createTopic(String orgTag, Command com){
-        String topic = null ;
-        switch (com.code){
-            case CodeSdV1.cd_Fault:{
-                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m4" ;
-                break ;
-            }
-            case CodeSdV1.cd_Stir:{
-                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m80" ;
-                break ;
-            }
-            case CodeSdV1.cd_Inject:{
-                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m1" ;
-                break ;
-            }
-            case CodeSdV1.cd_Irr:{
-                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m2" ;
-                break ;
-            }
-            default:{
-                topic = null ;
-                break;
-            }
-        }
-        return topic ;
-    }
 
 }
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java
similarity index 80%
rename from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java
rename to pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java
index 4f90810..185e4f5 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java
@@ -1,4 +1,4 @@
-package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
+package com.dy.common.mw.protocol4Mqtt.pSdV1.comParam;
 
 /**
  * @Author: liurunyu
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java
new file mode 100644
index 0000000..e9657ce
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java
@@ -0,0 +1,25 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Down;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 14:07
+ * @Description
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class FaultClearVo implements Vo4Down {
+    @JSONField(name = "鏁呴殰瑙i櫎")
+    public boolean isDo ;
+
+    @Override
+    public String toString(){
+        return "鏁呴殰瑙i櫎锛�" + (isDo?"鏄�":"鍚�") ;
+    }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java
new file mode 100644
index 0000000..4311fd2
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java
@@ -0,0 +1,24 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Down;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 14:13
+ * @Description
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class InjectStartVo implements Vo4Down {
+    @JSONField(name = "娉ㄨ偉鍚仠")
+    public boolean isDo ;//true涓哄惎锛宖alse涓哄仠
+    @Override
+    public String toString(){
+        return "娉ㄨ偉鍚仠锛�" + (isDo?"鍚�":"鍋�") ;
+    }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java
new file mode 100644
index 0000000..f05c154
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java
@@ -0,0 +1,24 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Down;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 14:13
+ * @Description
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class IrrStartVo implements Vo4Down {
+    @JSONField(name = "鐏屾簤鍚仠")
+    public boolean isDo ;//true涓哄惎锛宖alse涓哄仠
+    @Override
+    public String toString(){
+        return "鐏屾簤鍚仠锛�" + (isDo?"鍚�":"鍋�") ;
+    }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java
new file mode 100644
index 0000000..44baf99
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java
@@ -0,0 +1,24 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Down;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 14:13
+ * @Description
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class StirStartVo implements Vo4Down {
+    @JSONField(name = "鎼呮媽鍚仠")
+    public boolean isDo ;//true涓哄惎锛宖alse涓哄仠
+    @Override
+    public String toString(){
+        return "鎼呮媽鍚仠锛�" + (isDo?"鍚�":"鍋�") ;
+    }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java
new file mode 100644
index 0000000..f038514
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java
@@ -0,0 +1,36 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.upVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Up;
+import com.dy.common.util.DateTime;
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 10:05
+ * @Description
+ */
+@Data
+public class ManureVo implements Vo4Up {
+    @JSONField(name = "flexem_timestamp")
+    public Long devDt ;//璁惧鏃堕棿
+
+    public String devDtStr ;//璁惧鏃堕棿
+    public String getDevDtStr() {
+        if(devDt == null){
+            return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ;
+        }else{
+            return "" ;
+        }
+    }
+
+    @Override
+    public String toString(){
+        StringBuilder sb = new StringBuilder();
+        sb.append("姘磋偉鏁版嵁锛�") ;
+        sb.append(" 璁惧鏃堕棿锛�"+devDt) ;
+        sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ;
+        sb.append("\n") ;
+        return sb.toString() ;
+    }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java
new file mode 100644
index 0000000..a465609
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java
@@ -0,0 +1,36 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.upVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Up;
+import com.dy.common.util.DateTime;
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 10:05
+ * @Description
+ */
+@Data
+public class SoilVo implements Vo4Up {
+    @JSONField(name = "flexem_timestamp")
+    public Long devDt ;//璁惧鏃堕棿
+
+    public String devDtStr ;//璁惧鏃堕棿
+    public String getDevDtStr() {
+        if(devDt == null){
+            return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ;
+        }else{
+            return "" ;
+        }
+    }
+
+    @Override
+    public String toString(){
+        StringBuilder sb = new StringBuilder();
+        sb.append("澧掓儏鏁版嵁锛�") ;
+        sb.append(" 璁惧鏃堕棿锛�"+devDt) ;
+        sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ;
+        sb.append("\n") ;
+        return sb.toString() ;
+    }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java
new file mode 100644
index 0000000..cae3afc
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java
@@ -0,0 +1,36 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.upVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Up;
+import com.dy.common.util.DateTime;
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 10:05
+ * @Description
+ */
+@Data
+public class StateVo implements Vo4Up {
+    @JSONField(name = "flexem_timestamp")
+    public Long devDt ;//璁惧鏃堕棿
+
+    public String devDtStr ;//璁惧鏃堕棿
+    public String getDevDtStr() {
+        if(devDt == null){
+            return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ;
+        }else{
+            return "" ;
+        }
+    }
+
+    @Override
+    public String toString(){
+        StringBuilder sb = new StringBuilder();
+        sb.append("鐘舵�佹暟鎹細") ;
+        sb.append(" 璁惧鏃堕棿锛�"+devDt) ;
+        sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ;
+        sb.append("\n") ;
+        return sb.toString() ;
+    }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
new file mode 100644
index 0000000..6ebc8be
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
@@ -0,0 +1,70 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.upVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Up;
+import com.dy.common.util.DateTime;
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 10:04
+ * @Description
+ */
+@Data
+public class WeatherVo implements Vo4Up {
+    //{"PM10":10,"PM2.5":0,"flexem_message_id":1311,"flexem_timestamp":1749522958,"浜屾哀鍖栫⒊":10,"鍏夌収寮哄害":0,"澶ф皵鍘嬪姏":20,"绌烘皵娓╁害":0,"绌烘皵婀垮害":65}
+    @JSONField(name = "flexem_message_id")
+    public Integer messageId ;//娑堟伅ID
+
+    @JSONField(name = "浜屾哀鍖栫⒊")
+    public Integer carbonDioxide ;//浜屾哀鍖栫⒊
+
+    @JSONField(name = "鍏夌収寮哄害")
+    public Integer lightIntensity ;//鍏夌収寮哄害
+
+    @JSONField(name = "澶ф皵鍘嬪姏")
+    public Integer atmosphericPressure ;//澶ф皵鍘嬪姏
+
+    @JSONField(name = "绌烘皵娓╁害")
+    public Integer airTemperature ;//绌烘皵娓╁害
+
+    @JSONField(name = "绌烘皵婀垮害")
+    public Integer airHumidity ;//绌烘皵婀垮害
+
+    @JSONField(name = "PM2.5")
+    public Integer pm25 ;//PM2.5
+
+    @JSONField(name = "PM10")
+    public Integer pm10 ;//PM10
+
+
+    @JSONField(name = "flexem_timestamp")
+    public Long devDt ;//璁惧鏃堕棿
+
+    public String devDtStr ;//璁惧鏃堕棿
+    public String getDevDtStr() {
+        if(devDt == null){
+            return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ;
+        }else{
+            return "" ;
+        }
+    }
+
+    @Override
+    public String toString(){
+        StringBuilder sb = new StringBuilder();
+        sb.append("姘旇薄鏁版嵁锛�") ;
+        sb.append(" 娑堟伅ID锛�"+messageId) ;
+        sb.append(" 浜屾哀鍖栫⒊锛�"+carbonDioxide) ;
+        sb.append(" 鍏夌収寮哄害锛�"+lightIntensity) ;
+        sb.append(" 澶ф皵鍘嬪姏锛�"+atmosphericPressure) ;
+        sb.append(" 绌烘皵娓╁害锛�"+airTemperature) ;
+        sb.append(" 绌烘皵婀垮害锛�"+airHumidity) ;
+        sb.append(" PM2.5锛�"+pm25) ;
+        sb.append(" PM10锛�"+pm10) ;
+        sb.append(" 璁惧鏃堕棿锛�"+devDt) ;
+        sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ;
+        sb.append("\n") ;
+        return sb.toString() ;
+    }
+}
diff --git "a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/\350\257\264\346\230\216.txt" "b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/\350\257\264\346\230\216.txt"
index 6f29fa0..7d55cba 100644
--- "a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/\350\257\264\346\230\216.txt"
+++ "b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/\350\257\264\346\230\216.txt"
@@ -1,6 +1,6 @@
 灞变笢娉板畨鍏徃鎻愪緵姘磋偉鏈恒�佸湡澹ゅ鎯呯珯銆佹皵璞$珯銆丗Box绯荤粺鍗忚
 
 寤鸿娑堟伅涓婚瑙勫垯锛�
-瀛愮郴缁燂紙鏈烘瀯锛�/鍗忚鍚嶇О锛堝巶瀹讹級+鐗堟湰鍙�/璁惧缂栧彿/鍔熻兘缁�/鍦板潃
+瀛愮郴缁燂紙鏈烘瀯锛�/鍗忚鍚嶇О锛堝巶瀹讹級+鐗堟湰鍙�/璁惧缂栧彿/鍔熻兘缁�
 渚嬪锛�
-ym/sd1/10000/control/m4  (鍏冭皨/灞变笢+鐗堟湰1/璁惧缂栧彿/璁惧鎺у埗/鍦板潃)
\ No newline at end of file
+ym/sd1/10000/weather  (鍏冭皨/灞变笢+鐗堟湰1/璁惧缂栧彿/姘旇薄)
\ No newline at end of file
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java
new file mode 100644
index 0000000..1092b27
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java
@@ -0,0 +1,16 @@
+package com.dy.common.mw.protocol4Mqtt.status;
+
+import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo;
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 15:50
+ * @Description
+ */
+@Data
+public class DevOnLineSt implements MqttNotifyInfo {
+    public String id ;
+    public String protocol ;
+    public Boolean onLine ;
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java
new file mode 100644
index 0000000..0fe300a
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java
@@ -0,0 +1,18 @@
+package com.dy.common.mw.protocol4Mqtt.status;
+
+import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo;
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 15:52
+ * @Description
+ */
+@Data
+public class DevRunSt implements MqttNotifyInfo {
+    public String id ;
+    public Boolean stirRunning ;//鎼呮媽杩愯 true鏄� false鍚�
+    public Boolean injectRunning ;//娉ㄨ偉杩愯 true鏄� false鍚�
+    public Boolean irrRunning ;//鐏屾簤杩愯 true鏄� false鍚�
+    public Boolean alarm ;//鎶ヨ true鏄� false鍚�
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/softUpgrade/state/UpgradeRtu.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/softUpgrade/state/UpgradeRtu.java
index 7c13c3a..0424c43 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/softUpgrade/state/UpgradeRtu.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/softUpgrade/state/UpgradeRtu.java
@@ -60,30 +60,19 @@
     }
 
     public static String getStateName(int state){
-        switch (state) {
-            case STATE_OPEN:
-                return "闃�寮�";
-            case STATE_OFFLINE:
-                return "绂荤嚎";
-            case STATE_UNSTART:
-                return "鏈紑濮�";
-            case STATE_RUNNING:
-                return "鍗囩骇涓�";
-            case STATE_SUCCESS:
-                return "鍗囩骇鎴愬姛";
-            case STATE_FAILONE:
-                return "涓�鍖呮";
-            case STATE_FAIL:
-                return "澶氬寘姝�";
-            case STATE_FAILOFFLINE:
-                return "绂荤嚎澶辫触";
-            case STATE_FAILOPEN:
-                return "闃�寮�澶辫触";
-            case STATE_FAILRTU:
-                return "RTU澶辫触";
-            default:
-                return "鏈煡";
-        }
+        return switch (state) {
+            case STATE_OPEN -> "闃�寮�";
+            case STATE_OFFLINE -> "绂荤嚎";
+            case STATE_UNSTART -> "鏈紑濮�";
+            case STATE_RUNNING -> "鍗囩骇涓�";
+            case STATE_SUCCESS -> "鍗囩骇鎴愬姛";
+            case STATE_FAILONE -> "涓�鍖呮";
+            case STATE_FAIL -> "澶氬寘姝�";
+            case STATE_FAILOFFLINE -> "绂荤嚎澶辫触";
+            case STATE_FAILOPEN -> "闃�寮�澶辫触";
+            case STATE_FAILRTU -> "RTU澶辫触";
+            default -> "鏈煡";
+        };
     }
 
     /**
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
index b7917f5..0cf1c24 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
@@ -111,12 +111,12 @@
             }catch(Exception e){
             	company = "" ;
             }
-            System.out.println("OOOOOOOOOO           OOOOOOOO       OOOOOOOO") ;
-            System.out.println("@@@@@@@@@@@@@@@@#O    $@@@@@@@@&    @@@@@@@@#") ;       
-            System.out.println("@@@@@@@@@@@@@@@@@@@#    @@@@@@@@# $@@@@@@@@&") ;        
-            System.out.println("@@@@@@@@@@@@@@@@@@@@@#   #@@@@@@@@@@@@@@@@O") ;    
-            System.out.println("@@@@@@@@@@@@@@@@@@@@@@@   &@@@@@@@@@@@@@@") ;           
-            System.out.println("@@@@@@$      $@@@@@@@@@&   O@@@@@@@@@@@#") ;        
+            System.out.println("$$$$$$$$$$$$         $$$$$$$$       $$$$$$$$") ;
+            System.out.println("@@@@@@@@@@@@@@@@#$    $@@@@@@@@&    @@@@@@@@#") ;
+            System.out.println("@@@@@@@@@@@@@@@@@@@#    @@@@@@@@# $@@@@@@@@&") ;
+            System.out.println("@@@@@@@@@@@@@@@@@@@@@#   #@@@@@@@@@@@@@@@@$") ;
+            System.out.println("@@@@@@@@@@@@@@@@@@@@@@@   &@@@@@@@@@@@@@@") ;
+            System.out.println("@@@@@@$      $@@@@@@@@@&   $@@@@@@@@@@@#") ;
             System.out.println("@@@@@@$        @@@@@@@@@     @@@@@@@@@&      " + this.orgTag + svName + "RtuMw 1.0.00" ) ;
 			if(this.HttpSvPath != null && this.HttpSvPort != null){
 				System.out.println("@@@@@@$       O@@@@@@@@@     &@@@@@@@@       HttpSv [ip]:" + this.HttpSvPort + this.HttpSvPath) ;
@@ -450,7 +450,7 @@
 			mqVo.enable = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ;
 			ServerProperties.mqttUnitEnable = mqVo.enable ;
 			if(mqVo.enable){
-				mqVo.svIp = conf.getSetAttrTxt(doc, "config.mqtt", "svIp", null, true, null) ;
+				mqVo.svIp = conf.getSetAttrTxt(doc, "config.mqtt", "svIp", null, false, null) ;
 				if(!IPUtils.ipValid(mqVo.svIp)){
 					throw new Exception("config.mqtt.svIp閰嶇疆鐨処P涓嶅悎娉�") ;
 				}
@@ -458,13 +458,13 @@
 				if(mqVo.svPort < 0 || mqVo.svPort > 65535){
 					throw new Exception("config.mqtt.svPort閰嶇疆鐨勭鍙d笉鍚堟硶") ;
 				}
-				mqVo.svUserName = conf.getSetAttrTxt(doc, "config.mqtt", "svUserName", null, true, null) ;
+				mqVo.svUserName = conf.getSetAttrTxt(doc, "config.mqtt", "svUserName", null, false, null) ;
 				if(mqVo.svUserName == null || mqVo.svUserName.trim().equals("")){
 					throw new Exception("config.mqtt.svUserName閰嶇疆鐨勭敤鎴峰悕涓嶅悎娉�") ;
 				}else{
 					mqVo.svUserName = mqVo.svUserName.trim() ;
 				}
-				mqVo.svUserPassword = conf.getSetAttrTxt(doc, "config.mqtt", "svUserPassword", null, true, null) ;
+				mqVo.svUserPassword = conf.getSetAttrTxt(doc, "config.mqtt", "svUserPassword", null, false, null) ;
 				if(mqVo.svUserPassword == null || mqVo.svUserPassword.trim().equals("")){
 					throw new Exception("config.mqtt.svUserName閰嶇疆鐨勭敤鎴峰瘑鐮佷笉鍚堟硶") ;
 				}else{
@@ -474,28 +474,51 @@
 				if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){
 					throw new Exception("config.mqtt.poolMaxSize閰嶇疆鐨勮繛鎺ユ睜杩炴帴鏈�澶ф暟閲忎笉鍚堟硶") ;
 				}
-				String topicAndQos = conf.getSetAttrTxt(doc, "config.mqtt", "topicAndQos", null, true, null) ;
-				if(topicAndQos == null || topicAndQos.trim().equals("")){
-					throw new Exception("config.mqtt.topicAndQos閰嶇疆鐨勪富棰樺強Qos涓嶅悎娉�") ;
+				String proAndDevIds = conf.getSetAttrTxt(doc, "config.mqtt", "protocolAndDeviceIds", null, false, null) ;
+				if(proAndDevIds == null || proAndDevIds.trim().equals("")){
+					throw new Exception("config.mqtt.protocolAndDeviceIds閰嶇疆涓嶅悎娉�") ;
 				}else{
-					topicAndQos = topicAndQos.trim() ;
-					topicAndQos = topicAndQos.replaceAll("锛�", ",");
-					topicAndQos = topicAndQos.replaceAll("锛�", ";");
-					String[] topicAndQosArr = topicAndQos.split(";") ;
+					proAndDevIds = proAndDevIds.trim() ;
+					proAndDevIds = proAndDevIds.replaceAll("锛�", ",");
+					proAndDevIds = proAndDevIds.replaceAll("锛�", ";");
+					proAndDevIds = proAndDevIds.replaceAll("\\\\", "/");
+					mqVo.protocolAndDeviceIds = proAndDevIds.split(",") ;
+					mqVo.deviceIds = new String[mqVo.protocolAndDeviceIds.length] ;
+					int index = 0 ;
+					for(String topicAndQosStr : mqVo.protocolAndDeviceIds){
+						String[] pd = topicAndQosStr.split("/") ;
+						mqVo.deviceIds[index] = pd[1].trim() ;
+						index++ ;
+					}
+				}
+
+
+				String subTopicAndQos = conf.getSetAttrTxt(doc, "config.mqtt", "subTopicAndQos", null, false, null) ;
+				if(subTopicAndQos == null || subTopicAndQos.trim().equals("")){
+					throw new Exception("config.mqtt.subTopicAndQos閰嶇疆鐨勪富棰樺強Qos涓嶅悎娉�") ;
+				}else{
+					subTopicAndQos = subTopicAndQos.trim() ;
+					subTopicAndQos = subTopicAndQos.replaceAll("锛�", ",");
+					subTopicAndQos = subTopicAndQos.replaceAll("锛�", ";");
+					String[] topicAndQosArr = subTopicAndQos.split(";") ;
 					mqVo.subTopics = new String[topicAndQosArr.length] ;
-					mqVo.topicsQos = new int[topicAndQosArr.length] ;
+					mqVo.subTopicsQos = new int[topicAndQosArr.length] ;
 					int index = 0 ;
 					for(String topicAndQosStr : topicAndQosArr){
 						String[] tq = topicAndQosStr.split(",") ;
 						mqVo.subTopics[index] = tq[0].trim() ;
-						mqVo.topicsQos[index] = Integer.parseInt(tq[1].trim()) ;
+						mqVo.subTopicsQos[index] = Integer.parseInt(tq[1].trim()) ;
 						index++ ;
 					}
 				}
-				mqVo.publishQos = conf.getSetAttrPlusInt(doc, "config.mqtt", "publishQos", null, 0, 3, null);
-				if(mqVo.publishQos < 0 || mqVo.publishQos > 3){
-					throw new Exception("config.mqtt.publishQos閰嶇疆涓嶅悎娉�") ;
+				mqVo.pubTopicQos = conf.getSetAttrPlusInt(doc, "config.mqtt", "pubTopicQos", null, 0, 3, null);
+				if(mqVo.pubTopicQos < 0 || mqVo.pubTopicQos > 3){
+					throw new Exception("config.mqtt.pubTopicQos閰嶇疆涓嶅悎娉�") ;
 				}
+
+				Integer intNoSubThenOff = conf.getSetAttrPlusInt(doc, "config.mqtt", "noSubThenOff", null, 1, 1440, null);
+				mqVo.noSubThenOff = intNoSubThenOff * 60 * 1000L ;
+
 				mqVo.showStartInfo = showStartInfo ;
 				AdapterImp_MqttUnit mqAdapt = new AdapterImp_MqttUnit();
 				mqAdapt.setConfig(mqVo);
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/RtuLogDealer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/RtuLogDealer.java
index 0ffda6a..55b0b50 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/RtuLogDealer.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/RtuLogDealer.java
@@ -26,5 +26,23 @@
 		
 		ResourceUnit.getInstance().rtuLog(logNode);
 	}
+	/**
+	 * 璁板綍Rtu鏃ュ織
+	 * @param devId
+	 * @param content
+	 */
+	public static void log4Mqtt(String devId, String content){
+		if(devId == null || devId.trim().equals("")){
+			log.error("涓ラ噸閿欒锛岃褰昅qtt璁惧鏃ュ織鏃讹紝璁惧鍦板潃鏈彁渚涳紒") ;
+			return ;
+		}
+		if(content == null || content.equals("")){
+			log.error("涓ラ噸閿欒锛岃褰昅qtt璁惧鏃ュ織鏃讹紝鏃ュ織鍐呭鏈彁渚涳紒") ;
+			return ;
+		}
+		RtuLogNode logNode = new RtuLogNode(devId, content) ;
+
+		ResourceUnit.getInstance().rtuLog(logNode);
+	}
 
 }
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
index 9e21c81..6b4d081 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
@@ -5,6 +5,8 @@
 import com.dy.common.mw.protocol.Command;
 import com.dy.common.mw.protocol.rtuState.RtuStatus;
 import com.dy.rtuMw.server.local.localProtocol.*;
+import com.dy.rtuMw.server.mqtt.DevStatus;
+import com.dy.rtuMw.server.mqtt.DevStatusDealer;
 import com.dy.rtuMw.server.mqtt.MqttUnit;
 
 import java.util.HashMap;
@@ -23,37 +25,105 @@
      * @return
      */
     public Command deal(Command com) throws Exception{
+        Command rCom ;
         String code = com.getCode() ;
-        if(code.equals(CodeLocal.clock)){
-            return this.clock(com) ;
-        }else if(code.equals(CodeLocal.onAllLine)){
-            return this.onAllLine(com) ;
-        }else if(code.equals(CodeLocal.onPartLine)){
-            return this.onPartLine(com) ;
-        }else if(code.equals(CodeLocal.onLineStatistics)){
-            return this.onLineStateStatistics(com) ;
-        }else if(code.equals(CodeLocal.allRtuStates)){
-            return this.allRtuStates(com) ;
-        }else if(code.equals(CodeLocal.partRtuStates)){
-            return this.someRtuStates(com) ;
-        }else if(code.equals(CodeLocal.oneRtuStates)){
-            return this.oneRtuStates(com) ;
-        }else if(code.equals(CodeLocal.allProtocols)){
-            return this.allProtocols(com) ;
-        }else if(code.equals(CodeLocal.stopTcpSv)){
-            return this.stopTcpSv(com) ;
-        }else if(code.equals(CodeLocal.recoverTcpSv)){
-            return this.recoverTcpSv(com) ;
-        }else if(code.equals(CodeLocal.recoverMqttSv)){
-            return this.stopMqttSv(com) ;
-        }else if(code.equals(CodeLocal.mwState)){
-            return this.mwInfo(com) ;
+        switch (code) {
+            case CodeLocal.clock -> {
+                rCom = this.clock(com);
+                break;
+            }
+            case CodeLocal.mwState -> {
+                rCom = this.mwInfo(com);
+                break;
+            }
+
+            ////////////////////////////////////////////
+            //
+            // 浠ヤ笅鏄浉鍏冲熀浜嶵CP杩炴帴鐨凴TU璁惧鐨勫唴閮ㄥ懡浠�
+            //
+            ////////////////////////////////////////////
+            case CodeLocal.onAllLine -> {
+                rCom = this.onAllLine(com);
+                break;
+            }
+            case CodeLocal.onPartLine -> {
+                rCom = this.onPartLine(com);
+                break;
+            }
+            case CodeLocal.onLineStatistics -> {
+                rCom = this.onLineStateStatistics(com);
+                break;
+            }
+            case CodeLocal.allRtuStates -> {
+                rCom = this.allRtuStates(com);
+                break;
+            }
+            case CodeLocal.partRtuStates -> {
+                rCom = this.someRtuStates(com);
+                break;
+            }
+            case CodeLocal.oneRtuStates -> {
+                rCom = this.oneRtuStates(com);
+                break;
+            }
+            case CodeLocal.allProtocols -> {
+                rCom = this.allProtocols(com);
+                break;
+            }
+            case CodeLocal.stopTcpSv -> {
+                rCom = this.stopTcpSv(com);
+                break;
+            }
+            case CodeLocal.recoverTcpSv -> {
+                rCom = this.recoverTcpSv(com);
+                break;
+            }
+
+
+            ////////////////////////////////////////////
+            //
+            // 浠ヤ笅鏄浉鍏冲熀浜嶮QTT杩炴帴鐨勮澶囩殑鍐呴儴鍛戒护
+            //
+            ////////////////////////////////////////////
+            case CodeLocal.onAllLineMqtt -> {
+                rCom = this.onAllLineMqtt(com);
+                break;
+            }
+            case CodeLocal.onPartLineMqtt -> {
+                rCom = this.onPartLineMqtt(com);
+                break;
+            }
+            case CodeLocal.onLineStatisticsMqtt -> {
+                rCom = this.onLineStateStatisticsMqtt(com);
+                break;
+            }
+            case CodeLocal.allRtuStatesMqtt -> {
+                rCom = this.allRtuStatesMqtt(com);
+                break;
+            }
+            case CodeLocal.partRtuStatesMqtt -> {
+                rCom = this.someRtuStatesMqtt(com);
+                break;
+            }
+            case CodeLocal.oneRtuStatesMqtt -> {
+                rCom = this.oneRtuStatesMqtt(com);
+                break;
+            }
+            case CodeLocal.stopMqttSv -> {
+                rCom = this.stopMqttSv(com);
+                break;
+            }
+            default -> {
+                rCom = ReturnCommand.errored("鍑洪敊锛屾敹鍒板唴閮ㄥ懡浠ょ殑鍔熻兘鐮佷笉鑳借瘑鍒紒", com.getId(), com.getCode());
+                break;
+            }
         }
-        return ReturnCommand.errored("鍑洪敊锛屾敹鍒板唴閮ㄥ懡浠ょ殑鍔熻兘鐮佷笉鑳借瘑鍒紒", com.getId(), com.getCode()) ;
+        return rCom ;
     }
 
     /**
      * 鏌ヨ閫氫俊涓棿浠舵椂閽�
+     * @param  command
      * @throws Exception
      */
     private Command clock(Command command) throws Exception{
@@ -112,7 +182,7 @@
             Map<String, RtuStatus> map = new RtuStatusDeal().dealSome(rtuAddrGrp) ;
             return ReturnCommand.successed("鏌ヨ閮ㄥ垎RTU鐘舵�佺粨鏋�", command.getId(), command.getCode(), map) ;
         }else{
-            return ReturnCommand.errored("鍑洪敊锛屽懡浠ゅ弬鏁板簲璇ユ槸鎵�鏌ヨRTU鐨勫湴鍧�涓�",  command.getId(), command.getCode()) ;
+            return ReturnCommand.errored("鍑洪敊锛屽懡浠ゅ弬鏁板簲璇ユ湁鎵�鏌ヨRTU鐨勫湴鍧�涓�",  command.getId(), command.getCode()) ;
         }
     }
 
@@ -126,7 +196,7 @@
             RtuStatus rtuStatus = new RtuStatusDeal().dealOne(rtuAddr) ;
             return ReturnCommand.successed("鏌ヨ涓�涓猂TU鐘舵�佺粨鏋�", command.getId(), command.getCode(), rtuStatus) ;
         }else{
-            return ReturnCommand.errored("鍑洪敊锛屽懡浠ゅ弬鏁板簲璇ユ槸鎵�鏌ヨRTU鐨勫湴鍧�",  command.getId(), command.getCode()) ;
+            return ReturnCommand.errored("鍑洪敊锛屽懡浠ゅ弬鏁板簲璇ユ湁鎵�鏌ヨRTU鐨勫湴鍧�",  command.getId(), command.getCode()) ;
         }
     }
 
@@ -160,8 +230,77 @@
         return ReturnCommand.successed("宸茬粡鍚姩鎭㈠TCP鏈嶅姟", command.getId(), command.getCode(), null) ;
     }
 
+
     /**
-     * 鍋滄TCP鏈嶅姟锛屼笉鍐嶆帴鍏ユ柊鐨凾CP杩炴帴锛屽凡缁廡CP杩炴帴鐨勫叏閮ㄦ柇杩炴帴
+     * 鏌ヨ鎵�鏈塎QTT璁惧鍦ㄧ嚎鎯呭喌
+     * @throws Exception
+     */
+    private Command onAllLineMqtt(Command command) throws Exception{
+        HashMap<String, Boolean> map = DevStatusDealer.allOnLine() ;
+        return ReturnCommand.successed("鏌ヨ鎵�鏈塎qtt璁惧鍦ㄧ嚎鎯呭喌缁撴灉", command.getId(), command.getCode(), map) ;
+    }
+
+    /**
+     * 鏌ヨ閮ㄥ垎MQTT璁惧鍦ㄧ嚎鎯呭喌
+     * @throws Exception
+     */
+    private Command onPartLineMqtt(Command command) throws Exception{
+        if(command.param != null && command.param instanceof String && !command.param.equals("")){
+            String[] devIds = ((String)command.param).split(",");
+            HashMap<String, Boolean> map = DevStatusDealer.partOnLine(devIds) ;
+            return ReturnCommand.successed("鏌ヨ閮ㄥ垎Mqtt璁惧鍦ㄧ嚎鎯呭喌缁撴灉", command.getId(), command.getCode(), map) ;
+        }else{
+            return ReturnCommand.errored("鍑洪敊锛屽懡浠ゅ弬鏁板簲璇ユ湁鎵�鏌ヨMqtt璁惧鐨勫湴鍧�涓�",  command.getId(), command.getCode()) ;
+        }
+    }
+
+    /**
+     * 缁熻MQTT璁惧鍦ㄧ嚎涓庝笉鍦ㄧ嚎鎯呭喌
+     * @throws Exception
+     */
+    private Command onLineStateStatisticsMqtt(Command command) throws Exception{
+        RtuOnLineStateStatisticsVo vo = DevStatusDealer.statisticsOnLine() ;
+        return ReturnCommand.successed("鏌ヨ鎵�鏈塎qtt璁惧鍦ㄧ嚎鎯呭喌缁撴灉", command.getId(), command.getCode(), vo) ;
+    }
+
+    /**
+     * 鏌ヨ鎵�鏈塎QTT璁惧鐘舵��
+     * @throws Exception
+     */
+    private Command allRtuStatesMqtt(Command command) throws Exception{
+        Map<String, DevStatus> map =  DevStatusDealer.allStatus() ;
+        return ReturnCommand.successed("鏌ヨ鎵�鏈塎qtt璁惧鐘舵�佺粨鏋�", command.getId(), command.getCode(), map) ;
+    }
+
+    /**
+     * 鏌ヨ閮ㄥ垎MQTT璁惧鐘舵��
+     * @throws Exception
+     */
+    private Command someRtuStatesMqtt(Command command) throws Exception{
+        if(command.param != null && command.param instanceof String && !command.param.equals("")){
+            String[] devIds = ((String)command.param).split(",");
+            Map<String, DevStatus> map = DevStatusDealer.someStatus(devIds) ;
+            return ReturnCommand.successed("鏌ヨ閮ㄥ垎Mqtt璁惧鐘舵�佺粨鏋�", command.getId(), command.getCode(), map) ;
+        }else{
+            return ReturnCommand.errored("鍑洪敊锛屽懡浠ゅ弬鏁板簲璇ユ湁鎵�鏌ヨMqtt璁惧鐨勫湴鍧�涓�",  command.getId(), command.getCode()) ;
+        }
+    }
+
+    /**
+     * 鏌ヨ閮ㄥ垎MQTT璁惧鐘舵��
+     * @throws Exception
+     */
+    private Command oneRtuStatesMqtt(Command command) throws Exception{
+        if(command.param != null && command.param instanceof String && !command.param.equals("")){
+            String devId = (String)command.param;
+            DevStatus devStatus = DevStatusDealer.oneStatus(devId) ;
+            return ReturnCommand.successed("鏌ヨ涓�涓狹qtt璁惧鐘舵�佺粨鏋�", command.getId(), command.getCode(), devStatus) ;
+        }else{
+            return ReturnCommand.errored("鍑洪敊锛屽懡浠ゅ弬鏁板簲璇ユ湁鎵�鏌ヨMqtt璁惧鐨勫湴鍧�",  command.getId(), command.getCode()) ;
+        }
+    }
+    /**
+     * 鍋滄MQTT鏈嶅姟
      * @throws Exception
      */
     private Command stopMqttSv(Command command) throws Exception{
@@ -174,7 +313,7 @@
 
 
     /**
-     * 鎭㈠TCP鏈嶅姟锛屾帴鍏ユ柊鐨凾CP杩炴帴
+     * 鎭㈠MQTT鏈嶅姟
      * @throws Exception
      */
     private Command recoverMqttSv(Command command) throws Exception{
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
index 9edb9fb..e1a0417 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
@@ -1,12 +1,17 @@
 package com.dy.rtuMw.server.local.localProtocol;
 
 public class CodeLocal {
-
 	public static final String clock = "LCD0000" ;//鏌ヨ鐩戞帶涓棿浠舵椂閽�
+	public static final String mwState = "LCD0200" ;//寰楀埌閫氫俊涓棿浠惰繍琛屼俊鎭�
 
+	////////////////////////////////////////////
+	//
+	// 浠ヤ笅鏄浉鍏冲熀浜嶵CP杩炴帴鐨凴TU璁惧鐨勫唴閮ㄥ懡浠�
+	//
+	////////////////////////////////////////////
 	public static final String onAllLine = "LCD0001" ;//鏌ヨ鎵�鏈塕TU鍦ㄧ嚎鎯呭喌
 
-	public static final String onPartLine = "LCD0002" ;//鏌ヨ鎵�鏈塕TU鍦ㄧ嚎鎯呭喌
+	public static final String onPartLine = "LCD0002" ;//鏌ヨ閮ㄥ垎RTU鍦ㄧ嚎鎯呭喌
 
 	public static final String onLineStatistics = "LCD0003" ;//鏌ヨ鎵�鏈塕TU鐘舵�佺粺璁℃儏鍐�
 
@@ -22,10 +27,26 @@
 
 	public static final String recoverTcpSv = "LCD0112" ;//閲嶅惎TCP鏈嶅姟锛屾帴鍏ユ柊鐨凾CP杩炴帴
 
-	public static final String stopMqttSv = "LCD0114" ;//鍋滄Mqtt鏈嶅姟
 
-	public static final String recoverMqttSv = "LCD0116" ;//閲嶅惎Mqtt鏈嶅姟
 
-	public static final String mwState = "LCD0200" ;//寰楀埌閫氫俊涓棿浠惰繍琛屼俊鎭�
+	////////////////////////////////////////////
+	//
+	// 浠ヤ笅鏄浉鍏冲熀浜嶮QTT杩炴帴鐨勮澶囩殑鍐呴儴鍛戒护
+	//
+	////////////////////////////////////////////
+	public static final String onAllLineMqtt = "LMCD0001" ;//鏌ヨ鎵�鏈塎QTT璁惧鍦ㄧ嚎鎯呭喌
+
+	public static final String onPartLineMqtt = "LMCD0002" ;//鏌ヨ閮ㄥ垎MQTT璁惧鍦ㄧ嚎鎯呭喌
+
+	public static final String onLineStatisticsMqtt = "LMCD0003" ;//鏌ヨ鎵�鏈塎QTT璁惧鐘舵�佺粺璁℃儏鍐�
+
+	public static final String allRtuStatesMqtt = "LMCD0010" ;//鏌ヨ鎵�鏈塎QTT璁惧鐘舵��
+
+	public static final String partRtuStatesMqtt = "LMCD0011" ;//鏌ヨ閮ㄥ垎MQTT璁惧鐘舵��
+
+	public static final String oneRtuStatesMqtt = "LMCD0012" ;//鏌ヨ涓�涓狹QTT璁惧鐘舵��
+
+	public static final String stopMqttSv = "LMCD0110" ;//鍋滄Mqtt鏈嶅姟
+
 
 }
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatus.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatus.java
new file mode 100644
index 0000000..5f3976e
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatus.java
@@ -0,0 +1,23 @@
+package com.dy.rtuMw.server.mqtt;
+
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 14:54
+ * @Description
+ */
+@Data
+public class DevStatus {
+    public String id ;//璁惧ID
+    public String protocol;//鍗忚
+    public Boolean onLine ;//鏄惁鍦ㄧ嚎 true鍦ㄧ嚎 false绂荤嚎
+    public Boolean stirRunning ;//鎼呮媽杩愯 true鏄� false鍚�
+    public Boolean injectRunning ;//娉ㄨ偉杩愯 true鏄� false鍚�
+    public Boolean irrRunning ;//鐏屾簤杩愯 true鏄� false鍚�
+    public Boolean alarm ;//鎶ヨ true鏄� false鍚�
+
+    public Long lastDownComTime ;//涓婃涓嬪彂鍛戒护鏃跺埢(姣鏃跺埢 System.currentTimeMillis())
+    public Long lastUpDataTime ;//涓婃鏀跺埌涓婅鏁版嵁鏃跺埢(姣鏃跺埢 System.currentTimeMillis())
+
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
new file mode 100644
index 0000000..c164c0a
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
@@ -0,0 +1,186 @@
+package com.dy.rtuMw.server.mqtt;
+
+import com.dy.common.mw.protocol4Mqtt.status.DevRunSt;
+import com.dy.rtuMw.server.forTcp.RtuLogDealer;
+import com.dy.rtuMw.server.local.localProtocol.RtuOnLineStateStatisticsVo;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 15:00
+ * @Description
+ */
+public class DevStatusDealer {
+
+    private static Map<String, DevStatus> map = new HashMap<String, DevStatus>() ;
+
+    public static HashMap<String, Boolean> allOnLine(){
+        synchronized (map){
+            HashMap<String, Boolean> rsMap = new HashMap<>();
+            Iterator<Map.Entry<String, DevStatus>> it = map.entrySet().iterator() ;
+            Map.Entry<String, DevStatus> entry = null ;
+            while(it.hasNext()){
+                entry = it.next() ;
+                rsMap.put(entry.getKey(), entry.getValue().onLine) ;
+            }
+            return rsMap ;
+        }
+    }
+
+    public static HashMap<String, Boolean> partOnLine(String[] devIds){
+        synchronized (map){
+            HashMap<String, Boolean> rsMap = new HashMap<String, Boolean>();
+            for(String devid : devIds){
+                DevStatus st = map.get(devid) ;
+                if(st != null){
+                    rsMap.put(devid, st.onLine) ;
+                }
+            }
+            return rsMap ;
+        }
+    }
+    /**
+     * 缁熻鍦ㄧ嚎涓庝笉鍦ㄧ嚎鎯呭喌
+     */
+    public static RtuOnLineStateStatisticsVo statisticsOnLine(){
+        RtuOnLineStateStatisticsVo vo = new RtuOnLineStateStatisticsVo() ;
+        vo.onLineNum = 0 ;
+        vo.offLineNum = 0 ;
+        synchronized (map){
+            Iterator<Map.Entry<String, DevStatus>> it = map.entrySet().iterator() ;
+            Map.Entry<String, DevStatus> entry = null ;
+            while(it.hasNext()){
+                entry = it.next() ;
+                if(((DevStatus)entry).onLine != null && ((DevStatus)entry).onLine.booleanValue()){
+                    vo.onLineNum++ ;
+                }else{
+                    vo.offLineNum++ ;
+                }
+            }
+        }
+        return vo ;
+    }
+
+    /**
+     * 寰楀埌鍏ㄩ儴鐘舵��
+     * @return
+     */
+    public static Map<String, DevStatus> allStatus(){
+        return map ;
+    }
+    /**
+     * 寰楀埌閮ㄥ垎鐘舵��
+     * @return
+     */
+    public static Map<String, DevStatus> someStatus(String[] devIdArrGrp){
+        synchronized (map){
+            Map<String, DevStatus> rsMap = new HashMap<>();
+            for(String devId : devIdArrGrp){
+                DevStatus status = map.get(devId) ;
+                if(status != null){
+                    rsMap.put(devId, status) ;
+                }
+            }
+            return rsMap ;
+        }
+    }
+    /**
+     * 寰楀埌涓�涓猂TU鐨勭姸鎬�
+     * @return
+     */
+    public static DevStatus oneStatus(String devId){
+        return map.get(devId) ;
+    }
+
+    public static void updateOnLineState() {
+        if (MqttUnit.confVo != null
+                && MqttUnit.confVo.noSubThenOff != null
+                && MqttUnit.confVo.noSubThenOff.longValue() > 0) {
+            Long now = System.currentTimeMillis() ;
+            synchronized (map){
+                Set<Map.Entry<String, DevStatus>> entrySet = map.entrySet() ;
+                Iterator<Map.Entry<String, DevStatus>> it = entrySet.iterator() ;
+                Map.Entry<String, DevStatus> entry ;
+                DevStatus st;
+                while(it.hasNext()){
+                    entry = it.next() ;
+                    st = entry.getValue();
+                    if(st.onLine != null && st.onLine.booleanValue() && st.lastUpDataTime != null){
+                        if(now - st.lastUpDataTime > MqttUnit.confVo.noSubThenOff.longValue()){
+                            st.onLine = false ;
+                            RtuLogDealer.log4Mqtt(entry.getKey(), "鍥犺緝闀挎椂闂存湭鏀朵笂琛屾暟鎹紝璁や负璁惧绂荤嚎");
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * 鍙戦�佹秷鎭悗
+     * @param devId
+     */
+    public static void afterSendPubMessage(String devId){
+        DevStatus st = map.get(devId);
+        if(st != null){
+            st.lastDownComTime = System.currentTimeMillis() ;
+        }
+    }
+
+    /**
+     * 鎺ユ敹娑堟伅鍚�
+     * @param devId
+     */
+    public static void afterReceiveSubMessage(String devId){
+        DevStatus st = map.get(devId);
+        if(st != null){
+            st.lastUpDataTime = System.currentTimeMillis() ;
+        }
+    }
+
+    public static void onLine(String devId, String protocol){
+        DevStatus vo = map.get(devId) ;
+        if(vo == null) {
+            vo = new DevStatus();
+            vo.id = devId ;
+            vo.protocol = protocol ;
+            vo.onLine = true ;
+            map.put(devId, vo);
+        }else {
+            vo.onLine = true ;
+        }
+    }
+
+    public static void offLine(String devId){
+        DevStatus vo = map.get(devId) ;
+        if(vo == null) {
+            vo = new DevStatus();
+            vo.onLine = false ;
+            map.put(devId, vo);
+        }else {
+            vo.onLine = false ;
+        }
+    }
+
+    public static void setStatus(String devId, DevRunSt st){
+        DevStatus vo = map.get(devId) ;
+        if(vo != null) {
+            if(st.stirRunning != null){
+                vo.stirRunning = st.stirRunning ;
+            }
+            if(st.injectRunning != null){
+                vo.injectRunning = st.injectRunning ;
+            }
+            if(st.irrRunning != null){
+                vo.irrRunning = st.irrRunning ;
+            }
+            if(st.alarm != null){
+                vo.alarm = st.alarm ;
+            }
+        }
+    }
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
index 3c775b5..ac3e211 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
@@ -1,6 +1,11 @@
 package com.dy.rtuMw.server.mqtt;
 
 import com.dy.common.mw.channel.mqtt.MqttClientPool;
+import com.dy.common.mw.protocol4Mqtt.MqttNotify;
+import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo;
+import com.dy.common.mw.protocol4Mqtt.status.DevOnLineSt;
+import com.dy.common.mw.protocol4Mqtt.status.DevRunSt;
+import com.dy.rtuMw.server.ServerProperties;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.eclipse.paho.client.mqttv3.MqttClient;
@@ -33,13 +38,17 @@
         this.configVo = configVo;
     }
 
+    /**
+     * 鍒涘缓杩炴帴姹� + 璁㈤槄涓婚
+     * @throws Exception
+     */
     public void start()throws Exception{
         String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort;
         this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize);
         if(this.pool.isClose()){
             throw new Exception("Mqtt杩炴帴姹犲垵濮嬪寲澶辫触");
         }
-        MqttClient clientSub = null ;
+        MqttClient clientSub ;
         try {
             clientSub = pool.popClient();//鏂板垱寤轰竴涓狢lient鏃讹紝姝lient瀹為檯鍘昏繛鎺QTT鏈嶅姟鍣紝濡傛灉杩炴帴涓嶄笂锛屽氨浼氭姏鍑哄紓甯�
         }catch (Exception e){
@@ -48,8 +57,35 @@
         if(clientSub == null || !clientSub.isConnected()){
             throw new Exception("Mqtt杩炴帴姹犺幏寰楄闃呰繛鎺ヤ笉鍙敤");
         }
+        // 璁㈤槄涓婚
         for(int i = 0; i < this.configVo.subTopics.length; i++){
-            clientSub.subscribe(this.configVo.subTopics[i], this.configVo.topicsQos[i], new MqttMessageListener());
+            for(int j = 0 ; j < this.configVo.protocolAndDeviceIds.length; j++){
+                clientSub.subscribe(ServerProperties.orgTag + "/"
+                        + this.configVo.protocolAndDeviceIds[j] + "/"
+                        + this.configVo.subTopics[i],
+                        this.configVo.subTopicsQos[i],
+                        //姣忎竴涓闃呬富棰橀兘鏈変竴涓狹qttMessageListener瀹炰緥
+                        new MqttMessageListener(new MqttNotify(){
+                            @Override
+                            public void notify(String devId, MqttNotifyInfo... infos) {
+                                if(devId != null && infos != null && infos.length > 0){
+                                    for(MqttNotifyInfo info : infos){
+                                        if(info instanceof DevOnLineSt){
+                                            DevOnLineSt onLineSt = (DevOnLineSt)info;
+                                            if(onLineSt.onLine != null && onLineSt.onLine.booleanValue()){
+                                                DevStatusDealer.onLine(devId, ((DevOnLineSt)info).protocol);
+                                            }else{
+                                                DevStatusDealer.offLine(devId);
+                                            }
+                                        } else if(info instanceof DevRunSt){
+                                            DevStatusDealer.setStatus(devId, (DevRunSt)info);
+                                        }
+                                    }
+                                }
+                            }
+                        })
+                );
+            }
         }
     }
 
@@ -69,12 +105,12 @@
     }
 
     public void publishMsg(MqttClient client, String topic, byte[] msg) throws Exception{
-        client.publish(topic, msg, this.configVo.publishQos, false);
+        client.publish(topic, msg, this.configVo.pubTopicQos, false);
     }
 
     public void publishMsg(MqttClient client, String topic, String msg) throws Exception{
         byte[] bs = msg.getBytes("UTF-8") ;
-        client.publish(topic, bs, this.configVo.publishQos, false);
+        client.publish(topic, bs, this.configVo.pubTopicQos, false);
     }
 
     public boolean poolIsClose(){
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
index 19bd3eb..7d8c6ea 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
@@ -1,9 +1,11 @@
 package com.dy.rtuMw.server.mqtt;
 
-import com.dy.common.mw.protocol4Mqtt.MqttMsgParser;
-import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
-import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
+import com.dy.common.mw.protocol4Mqtt.*;
+import com.dy.common.mw.protocol4Mqtt.MqttCallback;
+import com.dy.common.mw.protocol4Mqtt.MqttTopic;
 import com.dy.common.util.Callback;
+import com.dy.rtuMw.server.forTcp.RtuLogDealer;
+import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.* ;
 
 /**
@@ -11,11 +13,30 @@
  * @Date: 2025/6/4 15:52
  * @Description
  */
+@Slf4j
 public class MqttMessageListener implements IMqttMessageListener{
+    private MqttNotify notify ;
+    public MqttMessageListener(MqttNotify notify){
+        this.notify = notify ;
+    }
+
     @Override
     public void messageArrived(String topic, MqttMessage msg) throws Exception {
-        MqttMsgParser parser = new MqttMsgParser() ;
-        MqttSubMsg subMsg = parser.parseSubMsg(topic, msg) ;
+        MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic) ;
+        MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback(){
+            @Override
+            public void callback(MqttSubMsg subMsg) {
+                DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol);
+                DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId);
+                RtuLogDealer.log4Mqtt(subMsg.deviceId, "璁㈤槄娑堟伅    涓婚锛�" + subMsg.topic + "   娑堟伅锛�" + subMsg.msg);
+            }
+            @Override
+            public void notify(String devId, MqttNotifyInfo... infos) {
+                if(notify != null){
+                    notify.notify(devId, infos) ;
+                }
+            }
+        }) ;
         this.nextDeal(subMsg);
     }
     private void nextDeal(MqttSubMsg subMsg)throws Exception {
@@ -25,17 +46,19 @@
                 MqttSubMsg subMs = (MqttSubMsg) obj ;
                 MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ;
                 if(pubMs != null){
+                    //鍖归厤鍒颁笅琛屾秷鎭紙鍛戒护锛�
                     subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ;
                     subMs.commandId = pubMs.commandId ;
                     try {
                         MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs));
                     } catch (Exception e) {
-                        e.printStackTrace();
+                        log.error("缂撳瓨鍙戝竷娑堟伅锛堝懡浠わ級缁撴灉鍙戠敓寮傚父", e);
                     }
                 }
                 try{
                     MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg));
                 }catch (Exception e){
+                    log.error("缂撳瓨璁㈤槄娑堟伅鏁版嵁鍙戠敓寮傚父", e);
                 }
             }
             @Override
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
index e345d59..7b827d5 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
@@ -3,6 +3,7 @@
 import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
 import com.dy.common.queue.NodeObj;
 import com.dy.rtuMw.server.ServerProperties;
+import com.dy.rtuMw.server.forTcp.RtuLogDealer;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.eclipse.paho.client.mqttv3.MqttClient;
@@ -65,6 +66,8 @@
             if(mqttClient != null && mqttClient.isConnected()){
                 try {
                     mqttManager.publishMsg(mqttClient, this.result.topic, this.result.msg);
+                    DevStatusDealer.afterSendPubMessage(this.result.deviceId);
+                    RtuLogDealer.log4Mqtt(this.result.deviceId, "鍙戝竷娑堟伅    涓婚锛�" + this.result.topic + "   娑堟伅锛�" + this.result.msg);
                     log.info("鍙戝竷MQTT娑堟伅锛堜富棰�=" + this.result.topic + "锛�" + this.result.msg);
                 }catch (Exception e){
                     log.error("MQTT鍙戝竷娑堟伅澶辫触锛堜富棰�=" + this.result.topic + "锛�" , e);
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
index c767276..f96f189 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
@@ -13,9 +13,12 @@
     public String svUserName ;//
     public String svUserPassword ;//
     public Integer poolMaxSize ;//
+    public String[] protocolAndDeviceIds ;//璁惧鍗忚涓嶪D锛團Box锛塱d
+    public String[] deviceIds ;//璁惧锛團Box锛塱d
     public String[] subTopics ;//璁㈤槄鐨勪富棰�
-    public int[] topicsQos ;////璁㈤槄涓婚鐨凲os
-    public int publishQos ;////鍙戝竷娑堟伅鐨凲os
+    public int[] subTopicsQos;//璁㈤槄涓婚鐨凲os
+    public int pubTopicQos;//鍙戝竷娑堟伅鐨凲os
+    public Long noSubThenOff; //MQtt璁惧鍦ㄤ竴瀹氭椂闂村悗鏈彂甯冩秷鎭紝璁や负璁惧绂荤嚎
 
     public MqttUnitConfigVo(){
         this.enable = false ;
@@ -24,6 +27,7 @@
         this.svUserName = "dyyjy" ;
         this.svUserPassword = "Dyyjy2025,;.abc!@#" ;
         this.poolMaxSize = 10 ;
-        this.publishQos = 1 ;
+        this.pubTopicQos = 1 ;
+        this.noSubThenOff = 10 * 60 * 10000L ;
     }
 }
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java
index 80501d1..debb4ab 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java
@@ -2,6 +2,7 @@
 
 import com.dy.common.mw.core.CoreTask;
 import com.dy.common.queue.Node;
+import com.dy.rtuMw.server.mqtt.DevStatusDealer;
 import com.dy.rtuMw.server.mqtt.MqttSubMsgCache;
 import com.dy.rtuMw.server.mqtt.MqttSubMsgNode;
 import org.apache.logging.log4j.LogManager;
@@ -21,16 +22,26 @@
     @Override
     public Integer execute() {
         try{
+            dealOneline() ;
+        }catch(Exception e){
+            log.error("鏇存柊RTU浼氳瘽涓婃姤鏁版嵁鏃跺埢鏃跺彂鐢熼泦鍚堟搷浣滃紓甯革紝姝ゅ紓甯稿苟涓嶅奖鍝嶇郴缁熸甯歌繍琛�", e);
+        }
+        try{
             dealMqMsg() ;
         }catch(Exception e){
             log.error(e);
         }
         return MqttSubMsgCache.size()>0?0:1 ;
     }
+
+    private void dealOneline(){
+        DevStatusDealer.updateOnLineState();
+    }
+
     /**
      * 澶勭悊MQTT璁㈤槄鐨勬秷鎭�
      */
-    public void dealMqMsg() {
+    private void dealMqMsg() {
         Node first = MqttSubMsgCache.getFirstQueueNode() ;
         if(first != null){
             Node last = MqttSubMsgCache.getLastQueueNode() ;
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties
index 08e4d72..b692e83 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties
@@ -10,7 +10,7 @@
 #   鐢樺窞锛� gz
 #   鍑夊窞锛� lz
 #   閲戝窛锛� jc
-base.orgTag=ym
+base.orgTag=mq
 
 # 233鏈嶅姟鍣細
 #   鍏冭皨锛� 60000
@@ -30,19 +30,23 @@
 base.upData.min.interval=6
 
 # MQTT鏈嶅姟閰嶇疆
+#   mqtt.enable 鏄惁鍚姩
+#   mqtt.protocolAndDeviceIds 鍦ㄥ瓙绯荤粺锛坥rgTag锛変腑鎺ュ叆鐨勮澶�(FBox)鎵�鐢ㄥ崗璁強璁惧id闆嗗悎,澶氫釜鐢ㄩ�楀彿闅斿紑锛屽崗璁笌ID鐢ㄦ鏂滄潬闅斿紑锛屼緥濡傦細sd1/338220031439,sd1/338220031440
+#   mqtt.subTopicAndQos 璁㈤槄涓婚涓嶲os锛屼富棰樺悕涓庡叾Qos鐢ㄩ�楀彿闅斿紑锛屽涓富棰樺強Qos鐢ㄥ垎鍙烽殧寮�锛屼緥濡傦細topic1,1;topic2,1;topic3,1
 # 233鏈嶅姟鍣細
-#   鍏冭皨锛� mqtt.enable=false
-#   娌欑洏锛� mqtt.enable=false
-#   娴嬭瘯锛� mqtt.enable=false
-#   姊呮睙锛� mqtt.enable=false
+#   鍏冭皨锛� mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
+#   娌欑洏锛� mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
+#   娴嬭瘯锛� mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
+#   姊呮睙锛� mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
 # 121鏈嶅姟鍣細
-#   姘戝嫟锛� mqtt.enable=true
-#   寤跺簡锛� mqtt.enable=false
-#   榛戦緳姹燂細 mqtt.enable=false
-#   鐢樺窞锛� mqtt.enable=false
-#   鍑夊窞锛� mqtt.enable=false
-#   閲戝窛锛� mqtt.enable=true
-# mq/sd1/338220031439/weather
+#   姘戝嫟锛� mqtt.enable=true  mqtt.protocolAndDeviceIds=? mqtt.topicAndQos=weather,1;soil,1;manure,1;state,1
+#   寤跺簡锛� mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
+#   榛戦緳姹燂細 mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
+#   鐢樺窞锛� mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
+#   鍑夊窞锛� mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
+#   閲戝窛锛� mqtt.enable=true  mqtt.protocolAndDeviceIds=? mqtt.topicAndQos=weather,1;soil,1;manure,1;state,1
 mqtt.enable=true
-mqtt.topicAndQos=ym/sd1/10000/control/m1,1;ym/sd1/10000/control/m2,1;ym/sd1/control/m4,1;ym/sd1/10000/control/m80,1
-
+mqtt.protocolAndDeviceIds=sd1/338220031439,sd1/338220031440
+mqtt.subTopicAndQos=weather,1;soil,1;manure,1;state,1
+#MQtt璁惧鍦ㄤ竴瀹氭椂闂达紙鍒嗛挓锛夊悗鏈彂甯冩秷鎭紝璁や负璁惧绂荤嚎
+mqtt.noSubThenOff=10
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
index 5f0b6b9..3d3e466 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
@@ -166,11 +166,18 @@
 
 
 	<!--
-	topicAndQos: 涓婚涓嶲os锛屼富棰樺悕涓庡叾Qos鐢ㄩ�楀彿闅斿紑锛屽涓富棰樺強Qos鐢ㄥ垎鍙烽殧寮�锛屼緥濡傦細ym/topic1,1;ym/topic2,1;ym/topic3,1锛屽鏋滄湁澶氫釜OrgTag锛屼富棰樺墠缂�鐢ㄥ叾OrgTag
-	publishQos: 鍙戝竷娑堟伅鐨凲os锛屽彇鍊艰寖鍥达細
+	enable 鏄惁鍚姩
+	svIp MQTT鏈嶅姟鍣↖P
+	svUserName MQTT鏈嶅姟鍣ㄧ敤鎴峰悕
+	svUserPassword MQTT鏈嶅姟鍣ㄧ敤鎴峰瘑鐮�
+	poolMaxSize 杩炴帴姹犳渶澶ц繛鎺ユ暟
+	protocolAndDeviceIds 鍦ㄥ瓙绯荤粺锛坥rgTag锛変腑鎺ュ叆鐨勮澶�(FBox)鎵�鐢ㄥ崗璁強璁惧id闆嗗悎,澶氫釜鐢ㄩ�楀彿闅斿紑锛屽崗璁笌ID鐢ㄦ鏂滄潬闅斿紑锛屼緥濡傦細sd1/338220031439,sd1/338220031440
+	subTopicAndQos: 璁㈤槄涓婚涓嶲os锛屼富棰樺悕涓庡叾Qos鐢ㄩ�楀彿闅斿紑锛屽涓富棰樺強Qos鐢ㄥ垎鍙烽殧寮�锛屼緥濡傦細ym/topic1,1;ym/topic2,1;ym/topic3,1锛屽鏋滄湁澶氫釜OrgTag锛屼富棰樺墠缂�鐢ㄥ叾OrgTag
+	pubTopicQos: 鍙戝竷涓婚鐨凲os锛屽彇鍊艰寖鍥达細
 		0	鑷冲涓�娆★紙At most once锛�	娑堟伅鍙戦�佸悗涓嶄繚璇佸埌杈撅紝鍙兘涓㈠け鎴栭噸澶嶏紝寮�閿�鏈�灏忥紝鍙潬鎬ф渶浣庛��
     	1	鑷冲皯涓�娆★紙At least once锛�	娑堟伅鑷冲皯浼氬埌杈句竴娆★紝鍙兘閲嶅锛屼絾涓嶄細涓㈠け锛屽彲闈犳�т腑绛夛紝閫傜敤浜庡鏁板満鏅��
     	2	鎭板ソ涓�娆★紙Exactly once锛�	娑堟伅浠呬細鍒拌揪涓�娆★紝涓嶉噸澶嶄笖涓嶄涪澶憋紝鍙潬鎬ф渶楂橈紝浣嗗紑閿�鏈�澶э紝瀹炵幇鏈�澶嶆潅銆�
+    noSubThenOff: MQtt璁惧鍦ㄤ竴瀹氭椂闂达紙鍒嗛挓锛夊悗鏈彂甯冩秷鎭紝璁や负璁惧绂荤嚎
      -->
 	<mqtt enable="${mqtt.enable}"
 		  svIp="121.199.41.121"
@@ -178,8 +185,10 @@
 		  svUserName="dyyjy"
 		  svUserPassword="Dyyjy2025,;.abc!@#"
 		  poolMaxSize="10"
-		  topicAndQos="${mqtt.topicAndQos}"
-		  publishQos="1"
+		  protocolAndDeviceIds="${mqtt.protocolAndDeviceIds}"
+		  subTopicAndQos="${mqtt.subTopicAndQos}"
+		  pubTopicQos="1"
+		  noSubThenOff="${mqtt.noSubThenOff}"
 	/>
 
 </config>
\ No newline at end of file

--
Gitblit v1.8.0