From f8b2e59a82702a790c383a8ecd90c708c76e2488 Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期四, 05 六月 2025 17:51:59 +0800
Subject: [PATCH] 增量开发MQTT协议、功能模块,上下行命令(消息)等

---
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java         |   49 +
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkReceive.java                |    5 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java                        |   23 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/AdapterImp_MqttUnit.java              |   19 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java                 |    3 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java                   |   61 +
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java            |   27 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/comResult/CommandResultDeal.java         |   35 +
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/CommandType.java                          |    8 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java            |   29 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java                                  |  126 ++
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttPubMessageConstantTask.java |   81 ++
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java                   |   46 +
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnit.java                    |   68 +
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java                      |   26 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java         |   12 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttComResultConstantTask.java  |   75 ++
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/com/CommandCtrl.java                     |   40 +
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/说明.txt                         |    6 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitAdapter.java             |   14 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkMqttData.java               |   37 +
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java |   77 ++
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgNode.java              |   69 +
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java    |   60 +
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java      |   11 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties                                    |   16 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgCache.java             |   68 +
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/pom.xml                                                                 |   12 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml                                           |   18 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultCache.java          |   69 +
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java    |    4 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java                  |   13 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml                                  |    4 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkFindPSdV1.java        |   31 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java                      |   23 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java             |  132 +++
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java                      |   16 
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/WebDownCom4MqttTask.java        |   51 +
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java                 |   86 ++
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java                             |  109 +++
 pipIrr-platform/pipIrr-common/pom.xml                                                                           |   13 
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java        |  170 ++++
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultNode.java           |   58 +
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java              |   95 ++
 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java            |   68 +
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java         |   27 
 46 files changed, 2,057 insertions(+), 33 deletions(-)

diff --git a/pipIrr-platform/pipIrr-common/pom.xml b/pipIrr-platform/pipIrr-common/pom.xml
index fa7c5a5..c0ea0e3 100644
--- a/pipIrr-platform/pipIrr-common/pom.xml
+++ b/pipIrr-platform/pipIrr-common/pom.xml
@@ -132,6 +132,18 @@
             <version>2.2.2</version>
         </dependency>
 
+        <!-- MQTT娑堟伅涓棿浠�,杩炴帴姹犲強瀹㈡埛绔�  -->
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-pool2</artifactId>
+            <version>2.9.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.2.5</version>
+        </dependency>
+
         <!-- quartz -->
         <dependency>
             <groupId>org.quartz-scheduler</groupId>
@@ -173,6 +185,7 @@
             <version>2.17.2</version>
         </dependency>
 
+
     </dependencies>
 
     <build>
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java
new file mode 100644
index 0000000..1d6d60f
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java
@@ -0,0 +1,46 @@
+package com.dy.common.mw.channel.mqtt;
+
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 11:35
+ * @Description
+ */
+public class MqttClientPool {
+
+    private final GenericObjectPool<MqttClient> pool;
+
+    public MqttClientPool(String broker, String username, String password, int maxConnections) {
+        MqttClientPooledObjectFactory factory = new MqttClientPooledObjectFactory(broker, username, password);
+        GenericObjectPoolConfig<MqttClient> config = new GenericObjectPoolConfig<>();
+        config.setMaxTotal(maxConnections);
+        config.setMaxIdle(maxConnections);
+        config.setMinIdle(1);
+        config.setTestOnBorrow(true);
+        config.setTestOnReturn(true);
+        config.setTestWhileIdle(true);
+        this.pool = new GenericObjectPool<>(factory, config);
+    }
+
+    public MqttClient popClient() throws Exception {
+        return pool.borrowObject();
+    }
+
+    public void pushClient(MqttClient client) {
+        if (client != null) {
+            pool.returnObject(client);
+        }
+    }
+
+    public boolean isClose(){
+        return pool.isClosed();
+    }
+
+    public void close() {
+        pool.close();
+    }
+}
+
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java
new file mode 100644
index 0000000..bd2eb0f
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java
@@ -0,0 +1,60 @@
+package com.dy.common.mw.channel.mqtt;
+
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 11:16
+ * @Description
+ */
+public class MqttClientPooledObjectFactory extends BasePooledObjectFactory<MqttClient> {
+
+    private final String broker;
+    private final String username;
+    private final String password;
+
+    public MqttClientPooledObjectFactory(String broker, String username, String password) {
+        this.broker = broker;
+        this.username = username;
+        this.password = password;
+    }
+
+    @Override
+    public MqttClient create() throws Exception {
+        String clientId = MqttClient.generateClientId();
+        MqttClient client = new MqttClient(broker, clientId);
+
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setUserName(username);
+        options.setPassword(password.toCharArray());
+        options.setAutomaticReconnect(true);
+        options.setCleanSession(true);
+
+        client.connect(options);
+        return client;
+    }
+
+    @Override
+    public PooledObject<MqttClient> wrap(MqttClient client) {
+        return new DefaultPooledObject<>(client);
+    }
+
+    @Override
+    public void destroyObject(PooledObject<MqttClient> p) throws Exception {
+        MqttClient client = p.getObject();
+        if (client.isConnected()) {
+            client.disconnect();
+        }
+        client.close();
+    }
+
+    @Override
+    public boolean validateObject(PooledObject<MqttClient> p) {
+        MqttClient client = p.getObject();
+        return client.isConnected();
+    }
+}
\ No newline at end of file
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java
new file mode 100644
index 0000000..2611e24
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java
@@ -0,0 +1,109 @@
+package com.dy.common.mw.channel.mqtt;
+
+import com.dy.common.util.Callback;
+import com.dy.common.util.ThreadJob;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 11:35
+ * @Description
+ */
+public class Test {
+
+    private static String mqSvIp = "121.199.41.121";
+    private static Integer mqSvPort = 1883;
+    private static String mqSvUserName = "dyyjy";
+    private static String mqSvUserPassword = "Dyyjy2025,;.abc!@#";
+
+    private static String topic1 = "test/topic1" ;
+    private static String topic2 = "test/topic2" ;
+
+    private static int maxConnections = 10 ;
+
+    private static MqttClientPool pool;
+    /**
+     * QoS
+     * 绛夌骇	鍚嶇О	                    娑堟伅浼犻�掔壒鎬�
+     * 0	鑷冲涓�娆★紙At most once锛�	娑堟伅鍙戦�佸悗涓嶄繚璇佸埌杈撅紝鍙兘涓㈠け鎴栭噸澶嶏紝寮�閿�鏈�灏忥紝鍙潬鎬ф渶浣庛��
+     * 1	鑷冲皯涓�娆★紙At least once锛�	娑堟伅鑷冲皯浼氬埌杈句竴娆★紝鍙兘閲嶅锛屼絾涓嶄細涓㈠け锛屽彲闈犳�т腑绛夛紝閫傜敤浜庡鏁板満鏅��
+     * 2	鎭板ソ涓�娆★紙Exactly once锛�	娑堟伅浠呬細鍒拌揪涓�娆★紝涓嶉噸澶嶄笖涓嶄涪澶憋紝鍙潬鎬ф渶楂橈紝浣嗗紑閿�鏈�澶э紝瀹炵幇鏈�澶嶆潅銆�
+     */
+    private static int QoS = 1 ;
+
+    public static void main(String[] args) {
+        try{
+            // 鍒濆鍖栬繛鎺ユ睜
+            pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections);
+            MqttClient clientSub = pool.popClient() ;
+            testSubscribe(clientSub, topic1);
+            testSubscribe(clientSub, topic2);
+            MqttClient clientPub = pool.popClient() ;
+            testPublish(clientPub, topic1, "hello world", 1000);
+            testPublish(clientPub, topic2, "hello China", 1500);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            // 鍏抽棴杩炴帴姹�
+            //pool.close();
+        }
+    }
+    private static void testSubscribe(MqttClient clientSub, String topic) throws Exception {
+        ThreadJob tjob = new ThreadJob() {
+            @Override
+            public Object execute() throws Exception {
+                clientSub.subscribe(topic, QoS, (topic, msg) -> {
+                    System.out.println("浠庝富棰�" + topic + "鏀跺埌娑堟伅: " + new String(msg.getPayload()));
+                });
+                while (true) {
+                    Thread.sleep(1000L);
+                }
+            }
+        };
+        tjob.start(new Callback() {
+            @Override
+            public void call(Object obj) {
+                System.out.println("鎵ц鎴愬姛");
+            }
+            @Override
+            public void call(Object... objs) {
+                System.out.println("鎵ц鎴愬姛");
+            }
+
+            @Override
+            public void exception(Exception e) {
+                e.printStackTrace();
+            }
+        });
+    }
+
+    private static void testPublish(MqttClient clientPub, String topic, String message, long sleep) throws Exception {
+        ThreadJob tjob = new ThreadJob() {
+            @Override
+            public Object execute() throws Exception {
+                int count = 0 ;
+                while (true) {
+                    // 鍙戝竷娑堟伅
+                    byte[] bs = (message + "  " + count++).getBytes() ;
+                    clientPub.publish(topic, bs, QoS, false);
+                    Thread.sleep(sleep);
+                }
+            }
+        };
+        tjob.start(new Callback() {
+            @Override
+            public void call(Object obj) {
+                System.out.println("鎵ц鎴愬姛");
+            }
+            @Override
+            public void call(Object... objs) {
+                System.out.println("鎵ц鎴愬姛");
+            }
+
+            @Override
+            public void exception(Exception e) {
+                e.printStackTrace();
+            }
+        });
+    }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/CommandType.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/CommandType.java
index 345acb7..5cc3e43 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/CommandType.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/CommandType.java
@@ -20,6 +20,14 @@
 	 */
 	public static final String outerTransCommand = "outerTransCommand" ;
 
+
+	/**
+	 * 閽堝Mqtt澶栭儴鍛戒护
+	 * 鍙兘鏄紓姝ワ紝鍛戒护缁撴灉閫氳繃鐩稿叧鐨勪俊鎭彂甯冮�氶亾鍙戝竷鍑哄幓
+	 */
+	public static final String mqttCommand = "mqttCommand" ;
+
+
 	/**
 	 * 鏈懡浠ゆ槸涓�涓埆鐨勫懡浠ょ殑缁撴灉锛堢粨鏋滀互鍛戒护鐨勬柟寮忚〃绀猴級
 	 */
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java
new file mode 100644
index 0000000..8663e05
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java
@@ -0,0 +1,23 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 17:38
+ * @Description 鍛戒护鍊煎璞�
+ */
+@Data
+public class Com4Mqtt {
+    public String commandId ;//鍛戒护ID
+
+    public String deviceId ;//璁惧ID
+
+    public String protocol;//鍗忚鍚嶇О锛堝疄鐜板巶瀹剁紪鐮侊級
+
+    public Short protocolVersion;//鍗忚鐗堟湰鍙�
+
+    public String code ;//鍔熻兘鐮�
+    public String value ;//鍊硷紙鍙兘鏄暣鏁般�佹诞鐐规暟銆佸竷灏旀暟锛坱rue鎴杅alse锛夛級
+
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java
new file mode 100644
index 0000000..15ec6b0
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java
@@ -0,0 +1,61 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+import com.dy.common.mw.protocol.Command;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.ProtocolConstantSdV1;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.ProtocolParserSdV1;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/5 10:00
+ * @Description
+ */
+public class MqttMsgParser {
+    public MqttSubMsg parseSubMsg(String topic, MqttMessage mqttMsg) throws Exception {
+        if(topic != null && topic.trim().length() != 0){
+            String[] topicGrp = topic.split("/") ;
+            if(topicGrp.length != 4){
+                throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓嶅彲璇嗗埆") ;
+            }else{
+                if(topicGrp[1].equals("sd1")){
+                    //灞变笢璁惧(鍗忚)锛屼笖鐗堟湰鍙蜂负1
+                    return new ProtocolParserSdV1().parseSubMsg(topicGrp[2], topic, mqttMsg);
+                }else{
+                    throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓崗璁紙鍘傚鍙婄増鏈級涓嶅彲璇嗗埆") ;
+                }
+            }
+        }else{
+            throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓虹┖") ;
+        }
+    }
+
+    public MqttPubMsg createPubMsg(String orgTag, Command com) throws Exception {
+        if(com.protocol == null && com.protocol.trim().length() != 0){
+            throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屼絾鏈彁渚涘崗璁�") ;
+        }
+        if(com.protocolVersion == null){
+            throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屼絾鏈彁渚涘崗璁増鏈彿") ;
+        }
+        if(com.code != null && com.code.trim().length() != 0){
+            throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屼絾鏈彁渚涘姛鑳界爜") ;
+        }
+        if(com.protocol.equals(ProtocolConstantSdV1.protocolName)){
+            if(com.protocolVersion.shortValue() == ProtocolConstantSdV1.protocolVer){
+                return new ProtocolParserSdV1().createPubMsg(orgTag, com) ;
+            }else{
+                throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鐗堟湰" + com.protocolVersion + "鏋勯�犲櫒鏈疄鐜�") ;
+            }
+        }else{
+            throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鏋勯�犲櫒鏈疄鐜�") ;
+        }
+    }
+
+
+    public static void main(String[] args) {
+        String s = "ym/sd1/10000/control/m1" ;
+        String[] ss = s.split("/") ;
+        for (String s1 : ss) {
+            System.out.println(s1);
+        }
+    }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java
new file mode 100644
index 0000000..bd864d3
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java
@@ -0,0 +1,23 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/5 11:44
+ * @Description
+ */
+public abstract class MqttPubMsg {
+    public String commandId ;//鍛戒护ID
+
+    public String deviceId ;//璁惧ID
+
+    public String mqttResultSendWebUrl ;//Mqtt杩斿洖鍛戒护缁撴灉 鍙戝悜鐩殑鍦皐eb URL
+
+    public String topic ;//娑堟伅涓婚
+    public String msg ;//娑堟伅
+
+    public boolean isCacheForOffLine ;//涓嬭鍛戒护鎺у埗锛屾秷鎭腑闂翠欢涓嶅湪绾挎槸鍚︾紦瀛樺懡浠�
+    public boolean hasResponse ;//涓嬭鍛戒护鎺у埗锛屽懡浠ゆ槸鍚︽湁搴旂瓟
+
+    public abstract boolean valid();
+
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
new file mode 100644
index 0000000..61b4f65
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
@@ -0,0 +1,26 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+import com.dy.common.util.Callback;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/5 11:44
+ * @Description
+ */
+
+public abstract class MqttSubMsg {
+    public String commandId ;//鍛戒护ID
+
+    public String deviceId ;//璁惧ID
+
+    public String mqttResultSendWebUrl ;//Mtt杩斿洖鍛戒护缁撴灉 鍙戝悜鐩殑鍦皐eb URL
+
+    public String topic ;//娑堟伅涓婚
+    public String msg ;//娑堟伅
+
+    public abstract boolean valid();
+
+    public abstract boolean subMsgMatchPubMsg(MqttPubMsg pubMsg);
+
+    public abstract void action(Callback callback);
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java
new file mode 100644
index 0000000..4d0cd99
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java
@@ -0,0 +1,13 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 17:41
+ * @Description
+ */
+public class CodeSdV1 {
+    public static final String cd_Fault = "00" ;//鏁呴殰瑙i櫎鍛戒护
+    public static final String cd_Stir = "01" ;//鎼呮媽鍚仠鍛戒护
+    public static final String cd_Inject = "02" ;//娉ㄨ偉鍚仠鍛戒护
+    public static final String cd_Irr = "03" ;//鐏屾簤鍚仠鍛戒护
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java
new file mode 100644
index 0000000..c93c7ee
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java
@@ -0,0 +1,27 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1;
+
+import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 18:00
+ * @Description 涓嬪彂鐨勫彂甯冩秷鎭紙鍗充笅琛屽懡浠わ級
+ */
+@Data
+public class MqttPubMsgSdV1 extends MqttPubMsg {
+
+    public Integer address ;//瀵勫瓨鍣ㄥ湴鍧�
+    public String value ;//瀵勫瓨鍣ㄥ��
+
+    @Override
+    public boolean valid() {
+        if (topic == null || topic.isEmpty()) {
+            return false;
+        }
+        if (msg == null || msg.isEmpty()) {
+            return false;
+        }
+        return true;
+    }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
new file mode 100644
index 0000000..03d1e8d
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
@@ -0,0 +1,68 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1;
+
+import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
+import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
+import com.dy.common.util.Callback;
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 16:58
+ * @Description 鏀跺埌鐨勮闃呮秷鎭�
+ */
+@Data
+public class MqttSubMsgSdV1 extends MqttSubMsg {
+    public Integer address ;//瀵勫瓨鍣ㄥ湴鍧�
+    public String value ;//瀵勫瓨鍣ㄥ��
+
+    public MqttSubMsgSdV1(){}
+
+    public MqttSubMsgSdV1(String deviceId, String topic, String msg) {
+        this.deviceId = deviceId ;
+        this.topic = topic ;
+        this.msg = msg ;
+    }
+    public String toString(){
+        StringBuilder sb = new StringBuilder();
+        if(commandId != null){
+            sb.append("commandId:")
+                    .append(commandId)
+                    .append("\n") ;
+        }
+        sb.append("涓婚:")
+                .append(topic)
+                .append("\n") ;
+        sb.append("娑堟伅:")
+                .append(msg)
+                .append("\n") ;
+
+        return sb.toString() ;
+    }
+
+    public boolean subMsgMatchPubMsg(MqttPubMsg pubMsg){
+        if (pubMsg instanceof MqttPubMsgSdV1) {
+            MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg;
+            if(this.address.intValue() == pubMsgSdV1.getAddress().intValue()){
+                return true ;
+            }
+        }
+        return false ;
+    }
+
+    @Override
+    public boolean valid() {
+        if (topic == null || topic.isEmpty()) {
+            return false;
+        }
+        if (msg == null || msg.isEmpty()) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void action(Callback callback){
+        callback.call(this) ;
+    }
+
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java
new file mode 100644
index 0000000..8d7e5cd
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java
@@ -0,0 +1,11 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/5 10:52
+ * @Description
+ */
+public class ProtocolConstantSdV1 {
+    public static final String protocolName = "sd" ;
+    public static final short protocolVer = 1 ;
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
new file mode 100644
index 0000000..1ca98b2
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
@@ -0,0 +1,170 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.dy.common.mw.protocol.Command;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.ComCtrlVo;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/5 11:41
+ * @Description
+ */
+public class ProtocolParserSdV1 {
+    public MqttSubMsgSdV1 parseSubMsg(String deviceId, String topic, MqttMessage mqttMsg) throws Exception {
+        MqttSubMsgSdV1 ms = new MqttSubMsgSdV1(deviceId, topic, new String(mqttMsg.getPayload(), "UTF-8"));
+        return ms;
+    }
+
+    public MqttPubMsgSdV1 createPubMsg(String orgTag, Command com) throws Exception {
+        MqttPubMsgSdV1 msg = null ;
+        switch (com.code){
+            case CodeSdV1.cd_Fault:{
+                //鏁呴殰瑙i櫎鍛戒护
+                this.checkParam(com);
+                this.checkRtnWebUrl(com);
+                msg = this.createPubMsgOfFault(orgTag, com) ;
+                break ;
+            }
+            case CodeSdV1.cd_Stir:{
+                //鎼呮媽鍚仠鍛戒护
+                this.checkParam(com);
+                this.checkRtnWebUrl(com);
+                msg = this.createPubMsgOfStir(orgTag, com) ;
+                break ;
+            }
+            case CodeSdV1.cd_Inject:{
+                //娉ㄨ偉鍚仠鍛戒护
+                this.checkParam(com);
+                this.checkRtnWebUrl(com);
+                msg = this.createPubMsgOfInject(orgTag, com) ;
+                break ;
+            }
+            case CodeSdV1.cd_Irr:{
+                //鐏屾簤鍚仠鍛戒护
+                this.checkParam(com);
+                this.checkRtnWebUrl(com);
+                msg = this.createPubMsgOfIrr(orgTag, com) ;
+                break ;
+            }
+            default:{
+                throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鐗堟湰" + com.protocolVersion + "鍔熻兘鐮�" + com.code + "鏋勯�犲櫒鏈疄鐜�") ;
+            }
+        }
+        return msg ;
+    }
+    private void checkParam(Command com)throws Exception {
+        if(com.param == null){
+            throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鐗堟湰" + com.protocolVersion + "鍔熻兘鐮�" + com.code + "鍛戒护鏁版嵁涓虹┖") ;
+        }
+    }
+    private void checkRtnWebUrl(Command com)throws Exception {
+        if(com.rtuResultSendWebUrl == null || com.rtuResultSendWebUrl.trim().equals("")){
+            throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鐗堟湰" + com.protocolVersion + "鍔熻兘鐮�" + com.code + "鍛戒护缁撴灉鍥炴敹URL涓虹┖") ;
+        }
+    }
+    private MqttPubMsgSdV1 createPubMsgOfFault(String orgTag, Command com) throws Exception {
+        JSONObject obj = (JSONObject) com.param;
+        String json = obj.toJSONString();
+        ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class);
+        if(cvo == null){
+            throw new Exception("json杞珻omCtrlVo涓簄ull") ;
+        }
+        MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ;
+        this.setPubMsgBase(com, msg);
+        msg.isCacheForOffLine = false ;
+        msg.hasResponse = true ;
+        msg.address = 123 ;
+        msg.value = "" + (cvo.isDo?1:0);
+        msg.topic = createTopic(orgTag, com) ;
+        msg.msg = "" ;
+        return msg ;
+    }
+    private MqttPubMsgSdV1 createPubMsgOfStir(String orgTag, Command com) throws Exception {
+        JSONObject obj = (JSONObject) com.param;
+        String json = obj.toJSONString();
+        ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class);
+        if(cvo == null){
+            throw new Exception("json杞珻omCtrlVo涓簄ull") ;
+        }
+        MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ;
+        this.setPubMsgBase(com, msg);
+        msg.isCacheForOffLine = false ;
+        msg.hasResponse = true ;
+        msg.address = 123 ;
+        msg.value = "" + (cvo.isDo?1:0);
+        msg.topic = createTopic(orgTag, com) ;
+        msg.msg = "" ;
+        return msg ;
+    }
+    private MqttPubMsgSdV1 createPubMsgOfInject(String orgTag, Command com) throws Exception {
+        JSONObject obj = (JSONObject) com.param;
+        String json = obj.toJSONString();
+        ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class);
+        if(cvo == null){
+            throw new Exception("json杞珻omCtrlVo涓簄ull") ;
+        }
+        MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ;
+        this.setPubMsgBase(com, msg);
+        msg.isCacheForOffLine = false ;
+        msg.hasResponse = true ;
+        msg.address = 123 ;
+        msg.value = "" + (cvo.isDo?1:0);
+        msg.topic = createTopic(orgTag, com) ;
+        msg.msg = "" ;
+        return msg ;
+    }
+    private MqttPubMsgSdV1 createPubMsgOfIrr(String orgTag, Command com) throws Exception {
+        JSONObject obj = (JSONObject) com.param;
+        String json = obj.toJSONString();
+        ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class);
+        if(cvo == null){
+            throw new Exception("json杞珻omCtrlVo涓簄ull") ;
+        }
+        MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ;
+        this.setPubMsgBase(com, msg);
+        msg.isCacheForOffLine = false ;
+        msg.hasResponse = true ;
+        msg.address = 123 ;
+        msg.value = "" + (cvo.isDo?1:0);
+        msg.topic = createTopic(orgTag, com) ;
+        msg.msg = "" ;
+        return msg ;
+    }
+
+    private void setPubMsgBase(Command com, MqttPubMsgSdV1 msg){
+        msg.commandId = com.id ;
+        msg.deviceId = com.rtuAddr ;
+        msg.mqttResultSendWebUrl = com.rtuResultSendWebUrl ;
+    }
+
+    private String createTopic(String orgTag, Command com){
+        String topic = null ;
+        switch (com.code){
+            case CodeSdV1.cd_Fault:{
+                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m4" ;
+                break ;
+            }
+            case CodeSdV1.cd_Stir:{
+                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m80" ;
+                break ;
+            }
+            case CodeSdV1.cd_Inject:{
+                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m1" ;
+                break ;
+            }
+            case CodeSdV1.cd_Irr:{
+                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m2" ;
+                break ;
+            }
+            default:{
+                topic = null ;
+                break;
+            }
+        }
+        return topic ;
+    }
+
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java
new file mode 100644
index 0000000..4f90810
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java
@@ -0,0 +1,12 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/5 15:57
+ * @Description
+ */
+public class ComCtrlVo {
+    //鏄惁鎺у埗鍔ㄤ綔锛宼rue鏄紝false鍚�
+    //鍙互鎵ц鍔熻兘鐮� 00锛�01锛�02锛�03鐨勫姩浣�
+    public boolean isDo;//
+}
diff --git "a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/\350\257\264\346\230\216.txt" "b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/\350\257\264\346\230\216.txt"
new file mode 100644
index 0000000..6f29fa0
--- /dev/null
+++ "b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/\350\257\264\346\230\216.txt"
@@ -0,0 +1,6 @@
+灞变笢娉板畨鍏徃鎻愪緵姘磋偉鏈恒�佸湡澹ゅ鎯呯珯銆佹皵璞$珯銆丗Box绯荤粺鍗忚
+
+寤鸿娑堟伅涓婚瑙勫垯锛�
+瀛愮郴缁燂紙鏈烘瀯锛�/鍗忚鍚嶇О锛堝巶瀹讹級+鐗堟湰鍙�/璁惧缂栧彿/鍔熻兘缁�/鍦板潃
+渚嬪锛�
+ym/sd1/10000/control/m4  (鍏冭皨/灞变笢+鐗堟湰1/璁惧缂栧彿/璁惧鎺у埗/鍦板潃)
\ No newline at end of file
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/pom.xml b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/pom.xml
index dbb1600..65316d7 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/pom.xml
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/pom.xml
@@ -125,6 +125,18 @@
             <version>2.0.7</version>
         </dependency>
 
+        <!-- MQTT娑堟伅涓棿浠�,杩炴帴姹犲強瀹㈡埛绔�  -->
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-pool2</artifactId>
+            <version>2.9.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.2.5</version>
+        </dependency>
+
         <!--閽夐拤娑堟伅鎺ㄩ��-->
         <dependency>
             <groupId>com.aliyun</groupId>
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
index 2a405c4..b7917f5 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
@@ -4,13 +4,15 @@
 import java.util.List;
 
 import com.dy.common.util.ConfigProperties;
+import com.dy.common.util.IPUtils;
 import com.dy.rtuMw.server.*;
+import com.dy.rtuMw.server.mqtt.MqttUnit;
+import com.dy.rtuMw.server.mqtt.MqttUnitConfigVo;
 import com.dy.rtuMw.server.msCenter.MsCenterConfigVo;
 import com.dy.rtuMw.server.msCenter.MsCenterUnit;
 import com.dy.rtuMw.server.rtuData.RtuDataUnit;
 import com.dy.rtuMw.server.rtuData.RtuDataUnitConfigVo;
-import com.dy.rtuMw.server.tasks.FromRtuComResultConstantTask;
-import com.dy.rtuMw.server.tasks.FromRtuDataConstantTask;
+import com.dy.rtuMw.server.tasks.*;
 import com.dy.common.mw.UnitInterface;
 import com.dy.common.mw.channel.tcp.TcpConfigVo;
 import com.dy.common.mw.channel.tcp.TcpUnit;
@@ -20,8 +22,6 @@
 import com.dy.common.mw.protocol.ProtocolUnit;
 import com.dy.common.mw.support.SupportUnit;
 import com.dy.common.mw.support.SupportUnitConfigVo;
-import com.dy.rtuMw.server.tasks.SendMsConstantTask;
-import com.dy.rtuMw.server.tasks.RtuDownConstantTask;
 import com.dy.rtuMw.resource.ResourceUnit;
 import com.dy.rtuMw.resource.ResourceUnitConfigVo;
 import com.dy.common.springUtil.SpringContextUtil;
@@ -359,28 +359,31 @@
 			//RTU杩滅▼鍗囩骇妯″潡
 			UpgradeUnitConfigVo ugVo = new UpgradeUnitConfigVo();
 			ugVo.enable = conf.getSetAttrBoolean(doc, "config.upgrade", "enable", null, null) ;
-			ugVo.openNoUpgrade = conf.getSetAttrBoolean(doc, "config.upgrade", "openNoUpgrade", null, null) ;
-			ugVo.lastOpenMaxGoOn = conf.getSetAttrPlusInt(doc, "config.upgrade", "lastOpenMaxGoOn", null, 5, 360000, null);
-			ugVo.lastOpenMaxGoOn = ugVo.lastOpenMaxGoOn * 1000 ;//鍙樻垚姣
-			ugVo.noOneRtuUpgradeMaxDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "noOneRtuUpgradeMaxDuration", null, 5, 360000, null);
-			ugVo.noOneRtuUpgradeMaxDuration = ugVo.noOneRtuUpgradeMaxDuration * 1000 ;//鍙樻垚姣
-			ugVo.runningAndIdleDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "runningAndIdleDuration", null, 5, 360000, null);
-			ugVo.runningAndIdleDuration = ugVo.runningAndIdleDuration * 1000 ;//鍙樻垚姣
-			ugVo.failTryTimes = conf.getSetAttrPlusInt(doc, "config.upgrade", "failTryTimes", null, 0, 100, null);
-			ugVo.ugMaxRtuAtOnce = conf.getSetAttrPlusInt(doc, "config.upgrade", "ugMaxRtuAtOnce", null, 0, 1000000, null);
-			ugVo.rtuOffLineWaitDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "rtuOffLineWaitDuration", null, 1, 3600000, null);
-			ugVo.rtuOffLineWaitDuration = ugVo.rtuOffLineWaitDuration * 1000;//鍙樻垚姣
-			ugVo.notifyStateInterval = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyStateInterval", null, 1, 300, null);
-			ugVo.notifyStateInterval = ugVo.notifyStateInterval * 1000;//鍙樻垚姣
-			ugVo.notifyTimesAfterOver = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyTimesAfterOver", null, 0, null, null);
-			ugVo.showStartInfo = showStartInfo ;
-			AdapterImp_UpgradeUnit ugAdap = new AdapterImp_UpgradeUnit();
-			ugAdap.setConfig(ugVo);
-			UpgradeUnit ugUnit = UpgradeUnit.getInstance();
-			ugUnit.setAdapter(ugAdap);
-			ugUnit.start(obj -> {
-			});
-			units.add(ugUnit) ;
+			if(ugVo.enable){
+				ugVo.openNoUpgrade = conf.getSetAttrBoolean(doc, "config.upgrade", "openNoUpgrade", null, null) ;
+				ugVo.lastOpenMaxGoOn = conf.getSetAttrPlusInt(doc, "config.upgrade", "lastOpenMaxGoOn", null, 5, 360000, null);
+				ugVo.lastOpenMaxGoOn = ugVo.lastOpenMaxGoOn * 1000 ;//鍙樻垚姣
+				ugVo.noOneRtuUpgradeMaxDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "noOneRtuUpgradeMaxDuration", null, 5, 360000, null);
+				ugVo.noOneRtuUpgradeMaxDuration = ugVo.noOneRtuUpgradeMaxDuration * 1000 ;//鍙樻垚姣
+				ugVo.runningAndIdleDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "runningAndIdleDuration", null, 5, 360000, null);
+				ugVo.runningAndIdleDuration = ugVo.runningAndIdleDuration * 1000 ;//鍙樻垚姣
+				ugVo.failTryTimes = conf.getSetAttrPlusInt(doc, "config.upgrade", "failTryTimes", null, 0, 100, null);
+				ugVo.ugMaxRtuAtOnce = conf.getSetAttrPlusInt(doc, "config.upgrade", "ugMaxRtuAtOnce", null, 0, 1000000, null);
+				ugVo.rtuOffLineWaitDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "rtuOffLineWaitDuration", null, 1, 3600000, null);
+				ugVo.rtuOffLineWaitDuration = ugVo.rtuOffLineWaitDuration * 1000;//鍙樻垚姣
+				ugVo.notifyStateInterval = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyStateInterval", null, 1, 300, null);
+				ugVo.notifyStateInterval = ugVo.notifyStateInterval * 1000;//鍙樻垚姣
+				ugVo.notifyTimesAfterOver = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyTimesAfterOver", null, 0, null, null);
+				ugVo.showStartInfo = showStartInfo ;
+				AdapterImp_UpgradeUnit ugAdap = new AdapterImp_UpgradeUnit();
+				ugAdap.setConfig(ugVo);
+				UpgradeUnit ugUnit = UpgradeUnit.getInstance();
+				ugUnit.setAdapter(ugAdap);
+				ugUnit.start(obj -> {
+				});
+				units.add(ugUnit) ;
+			}
+
 
 			/////////////////
 			//RTU涓婅鏁版嵁澶勭悊妯″潡锛堜换鍔℃爲锛�
@@ -410,6 +413,12 @@
 			CoreUnit.addConstantTask(new RtuDownConstantTask());
 			CoreUnit.addConstantTask(new FromRtuDataConstantTask());
 			CoreUnit.addConstantTask(new FromRtuComResultConstantTask());
+			Boolean enableMq = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ;
+			if(enableMq != null && enableMq.booleanValue()){
+				CoreUnit.addConstantTask(new MqttSubMessageConstantTask());
+				CoreUnit.addConstantTask(new MqttPubMessageConstantTask());
+				CoreUnit.addConstantTask(new MqttComResultConstantTask());
+			}
 			CoreUnit.addConstantTask(new SendMsConstantTask());
 			coreUnit.start(obj -> {
 			});
@@ -433,7 +442,70 @@
 				});
 				TcpSvUrl = "[ip]:" + tcpVo.port ;
 				units.add(tcpUnit) ;
-			}	
+			}
+
+			/////////////////
+			//MQTT妯″潡
+			MqttUnitConfigVo mqVo = new MqttUnitConfigVo();
+			mqVo.enable = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ;
+			ServerProperties.mqttUnitEnable = mqVo.enable ;
+			if(mqVo.enable){
+				mqVo.svIp = conf.getSetAttrTxt(doc, "config.mqtt", "svIp", null, true, null) ;
+				if(!IPUtils.ipValid(mqVo.svIp)){
+					throw new Exception("config.mqtt.svIp閰嶇疆鐨処P涓嶅悎娉�") ;
+				}
+				mqVo.svPort = conf.getSetAttrPlusInt(doc, "config.mqtt", "svPort", null, 5, 360000, null);
+				if(mqVo.svPort < 0 || mqVo.svPort > 65535){
+					throw new Exception("config.mqtt.svPort閰嶇疆鐨勭鍙d笉鍚堟硶") ;
+				}
+				mqVo.svUserName = conf.getSetAttrTxt(doc, "config.mqtt", "svUserName", null, true, null) ;
+				if(mqVo.svUserName == null || mqVo.svUserName.trim().equals("")){
+					throw new Exception("config.mqtt.svUserName閰嶇疆鐨勭敤鎴峰悕涓嶅悎娉�") ;
+				}else{
+					mqVo.svUserName = mqVo.svUserName.trim() ;
+				}
+				mqVo.svUserPassword = conf.getSetAttrTxt(doc, "config.mqtt", "svUserPassword", null, true, null) ;
+				if(mqVo.svUserPassword == null || mqVo.svUserPassword.trim().equals("")){
+					throw new Exception("config.mqtt.svUserName閰嶇疆鐨勭敤鎴峰瘑鐮佷笉鍚堟硶") ;
+				}else{
+					mqVo.svUserPassword = mqVo.svUserPassword.trim() ;
+				}
+				mqVo.poolMaxSize = conf.getSetAttrPlusInt(doc, "config.mqtt", "poolMaxSize", null, 5, 360000, null);
+				if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){
+					throw new Exception("config.mqtt.poolMaxSize閰嶇疆鐨勮繛鎺ユ睜杩炴帴鏈�澶ф暟閲忎笉鍚堟硶") ;
+				}
+				String topicAndQos = conf.getSetAttrTxt(doc, "config.mqtt", "topicAndQos", null, true, null) ;
+				if(topicAndQos == null || topicAndQos.trim().equals("")){
+					throw new Exception("config.mqtt.topicAndQos閰嶇疆鐨勪富棰樺強Qos涓嶅悎娉�") ;
+				}else{
+					topicAndQos = topicAndQos.trim() ;
+					topicAndQos = topicAndQos.replaceAll("锛�", ",");
+					topicAndQos = topicAndQos.replaceAll("锛�", ";");
+					String[] topicAndQosArr = topicAndQos.split(";") ;
+					mqVo.subTopics = new String[topicAndQosArr.length] ;
+					mqVo.topicsQos = new int[topicAndQosArr.length] ;
+					int index = 0 ;
+					for(String topicAndQosStr : topicAndQosArr){
+						String[] tq = topicAndQosStr.split(",") ;
+						mqVo.subTopics[index] = tq[0].trim() ;
+						mqVo.topicsQos[index] = Integer.parseInt(tq[1].trim()) ;
+						index++ ;
+					}
+				}
+				mqVo.publishQos = conf.getSetAttrPlusInt(doc, "config.mqtt", "publishQos", null, 0, 3, null);
+				if(mqVo.publishQos < 0 || mqVo.publishQos > 3){
+					throw new Exception("config.mqtt.publishQos閰嶇疆涓嶅悎娉�") ;
+				}
+				mqVo.showStartInfo = showStartInfo ;
+				AdapterImp_MqttUnit mqAdapt = new AdapterImp_MqttUnit();
+				mqAdapt.setConfig(mqVo);
+				MqttUnit mqUnit = MqttUnit.getInstance();
+				mqUnit.setAdapter(mqAdapt);
+				mqUnit.start(obj -> {
+				});
+				units.add(mqUnit) ;
+			}
+
 		} catch (Exception e) {
 			e.printStackTrace();
 		}		
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java
index 9f945ae..5171b77 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java
@@ -20,13 +20,19 @@
             try{
                 // 纭繚杩欐浠g爜灏藉彲鑳藉揩閫熸墽琛岋紝閬垮厤褰卞搷JVM鐨勫叧闂�
                 log.info("绋嬪簭锛堟帶鍒跺彴锛夊叧闂挬瀛愮被鎵ц");
-                Command com = new Command() ;
-                com.id = Command.defaultId ;
-                com.code = CodeLocal.stopTcpSv ;
-                com.type = CommandType.innerCommand ;
-                new CommandInnerDeaLer().deal(com) ;
+                Command com1 = new Command() ;
+                com1.id = Command.defaultId ;
+                com1.code = CodeLocal.stopTcpSv ;
+                com1.type = CommandType.innerCommand ;
+                new CommandInnerDeaLer().deal(com1) ;
                 //Thread.sleep(100L);//瀹炴祴涓嶆墽琛�
                 log.info("鍏抽棴绋嬪簭鍓嶏紝鍏抽棴浜員CP鏈嶅姟");
+                Command com2 = new Command() ;
+                com2.id = Command.defaultId ;
+                com2.code = CodeLocal.stopMqttSv ;
+                com2.type = CommandType.innerCommand ;
+                new CommandInnerDeaLer().deal(com2) ;
+
             }catch (Exception e){
                 log.error("绋嬪簭锛堟帶鍒跺彴锛夊叧闂挬瀛愬彂鐢熷紓甯�", e);
             }
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/AdapterImp_MqttUnit.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/AdapterImp_MqttUnit.java
new file mode 100644
index 0000000..4f098b5
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/AdapterImp_MqttUnit.java
@@ -0,0 +1,19 @@
+package com.dy.rtuMw.server;
+
+
+import com.dy.rtuMw.server.mqtt.MqttUnitAdapter;
+import com.dy.rtuMw.server.mqtt.MqttUnitConfigVo;
+
+public class AdapterImp_MqttUnit implements MqttUnitAdapter {
+	
+	private MqttUnitConfigVo configVo ;
+
+	public MqttUnitConfigVo getConfig() {
+		return configVo;
+	}
+	
+	public void setConfig(MqttUnitConfigVo configVo){
+		this.configVo = configVo ;
+	}
+
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java
index cf3f4a4..ef5a0da 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java
@@ -54,4 +54,7 @@
 	//鏈夋姤璀﹀彂鐢熸椂锛屽悜閽夐拤鍙戦�佹秷鎭殑闂撮殧鏃堕暱锛堝垎閽燂級
 	public static Integer sendDingDingAlarmMsInterval = 60 ;
 
+	//Mqtt妯″潡鏄惁鍚姩
+	public static Boolean mqttUnitEnable = false ;
+
 }
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
index d2f4740..9e21c81 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
@@ -5,6 +5,7 @@
 import com.dy.common.mw.protocol.Command;
 import com.dy.common.mw.protocol.rtuState.RtuStatus;
 import com.dy.rtuMw.server.local.localProtocol.*;
+import com.dy.rtuMw.server.mqtt.MqttUnit;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -43,6 +44,8 @@
             return this.stopTcpSv(com) ;
         }else if(code.equals(CodeLocal.recoverTcpSv)){
             return this.recoverTcpSv(com) ;
+        }else if(code.equals(CodeLocal.recoverMqttSv)){
+            return this.stopMqttSv(com) ;
         }else if(code.equals(CodeLocal.mwState)){
             return this.mwInfo(com) ;
         }
@@ -157,6 +160,30 @@
         return ReturnCommand.successed("宸茬粡鍚姩鎭㈠TCP鏈嶅姟", command.getId(), command.getCode(), null) ;
     }
 
+    /**
+     * 鍋滄TCP鏈嶅姟锛屼笉鍐嶆帴鍏ユ柊鐨凾CP杩炴帴锛屽凡缁廡CP杩炴帴鐨勫叏閮ㄦ柇杩炴帴
+     * @throws Exception
+     */
+    private Command stopMqttSv(Command command) throws Exception{
+        MqttUnit.getInstance().stop(new UnitCallbackInterface(){
+            public void call(Object obj) throws Exception {
+            }
+        });
+        return ReturnCommand.successed("宸茬粡鍚姩鍋滄Mqtt鏈嶅姟", command.getId(), command.getCode(), null) ;
+    }
+
+
+    /**
+     * 鎭㈠TCP鏈嶅姟锛屾帴鍏ユ柊鐨凾CP杩炴帴
+     * @throws Exception
+     */
+    private Command recoverMqttSv(Command command) throws Exception{
+        MqttUnit.getInstance().recover(new UnitCallbackInterface(){
+            public void call(Object obj) throws Exception {
+            }
+        });
+        return ReturnCommand.successed("宸茬粡鍚姩鎭㈠Mqtt鏈嶅姟", command.getId(), command.getCode(), null) ;
+    }
 
 
     /**
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
index 9337b04..9edb9fb 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
@@ -22,6 +22,10 @@
 
 	public static final String recoverTcpSv = "LCD0112" ;//閲嶅惎TCP鏈嶅姟锛屾帴鍏ユ柊鐨凾CP杩炴帴
 
+	public static final String stopMqttSv = "LCD0114" ;//鍋滄Mqtt鏈嶅姟
+
+	public static final String recoverMqttSv = "LCD0116" ;//閲嶅惎Mqtt鏈嶅姟
+
 	public static final String mwState = "LCD0200" ;//寰楀埌閫氫俊涓棿浠惰繍琛屼俊鎭�
 
 }
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultCache.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultCache.java
new file mode 100644
index 0000000..413d9f2
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultCache.java
@@ -0,0 +1,69 @@
+package com.dy.rtuMw.server.mqtt;
+
+import com.dy.common.queue.Node;
+import com.dy.common.queue.Queue;
+import com.dy.rtuMw.server.ServerProperties;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/5 15:05
+ * @Description
+ */
+public class MqttComResultCache {
+
+    //TCP涓嬭鍛戒护缂撳瓨闃熷垪
+    private static Queue cacheQueue = new Queue("MqttComResultCache") ;
+
+    private static MqttComResultCache instance = new MqttComResultCache() ;
+
+    private MqttComResultCache(){
+        cacheQueue.setLimit(ServerProperties.cacheUpDownDataWarnCount, ServerProperties.cacheUpDownDataMaxCount);
+    }
+
+    public static MqttComResultCache getInstance(){
+        return instance ;
+    }
+
+    /**
+     * 缂撳瓨鑺傜偣
+     * @param node node
+     * @throws Exception 寮傚父
+     */
+    public static void cacheMqttComResult(MqttComResultNode node) throws Exception{
+        if(node != null && node.obj != null){
+            cacheQueue.pushHead(node);
+        }
+    }
+
+    /**
+     * 寰楀埌绗竴涓妭鐐�
+     * @return Node
+     */
+    public static Node getFirstQueueNode(){
+        return cacheQueue.getFirstNode() ;
+    }
+
+    /**
+     * 寰楀埌鏈�鍚庝竴涓妭鐐�
+     * @return Node
+     */
+    public static Node getLastQueueNode(){
+        return cacheQueue.getLastNode() ;
+    }
+
+    /**
+     * 绉婚櫎鑺傜偣
+     * @param node
+     */
+    public static void removeNode(Node node){
+        cacheQueue.remove(node);
+    }
+
+    /**
+     * 缂撳瓨鐨勮妭鐐规暟
+     * @Return 缂撳瓨鑺傜偣鏁�
+     */
+    public static Integer size(){
+        return cacheQueue.size() ;
+    }
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultNode.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultNode.java
new file mode 100644
index 0000000..5097c57
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultNode.java
@@ -0,0 +1,58 @@
+package com.dy.rtuMw.server.mqtt;
+
+import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
+import com.dy.common.queue.NodeObj;
+import com.dy.common.springUtil.SpringContextUtil;
+import com.dy.common.threadPool.ThreadPool;
+import com.dy.common.threadPool.TreadPoolFactory;
+import com.dy.rtuMw.web.comResult.CommandResultDeal;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/5 15:06
+ * @Description
+ */
+public class MqttComResultNode implements NodeObj {
+
+    private static final Logger log = LogManager.getLogger(MqttComResultNode.class.getName());
+
+    public Object obj ;//鏁版嵁
+
+    public MqttComResultNode(Object obj){
+        this.obj = obj ;
+    }
+    /**
+     * 鑷繁澶勭悊鑷繁
+     * @return
+     */
+    public boolean dealSelf(){
+        try {
+            ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolLong() ;
+            pool.putJob(new ThreadPool.Job() {
+                public void execute() {
+                    if(obj != null){
+                        if(obj instanceof MqttSubMsg){
+                            CommandResultDeal deal = SpringContextUtil.getBean(CommandResultDeal.class) ;
+                            deal.deal((MqttSubMsg)obj);
+                        }
+                    }
+                }
+                @Override
+                public void destroy(){
+                }
+                @Override
+                public boolean isDestroy(){
+                    return false ;
+                }
+
+            });
+        } catch (Exception e) {
+            log.error("鍦≧tuComResultNode鍐呭彂鐢熷紓甯�", e);
+        }
+        return true ;
+    }
+
+
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
new file mode 100644
index 0000000..3c775b5
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
@@ -0,0 +1,86 @@
+package com.dy.rtuMw.server.mqtt;
+
+import com.dy.common.mw.channel.mqtt.MqttClientPool;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 14:54
+ * @Description
+ */
+public class MqttManager {
+
+    private static final Logger log = LogManager.getLogger(MqttManager.class.getName());
+
+    private static final MqttManager INSTANCE = new MqttManager();
+
+    private MqttUnitConfigVo configVo ;
+
+    private MqttClientPool pool;
+
+    private MqttManager(){
+    }
+
+    public static MqttManager getInstance() {
+        return MqttManager.INSTANCE;
+    }
+    /**
+     *  鍒濆鍖栭厤缃俊鎭�
+     */
+    public void initOption(MqttUnitConfigVo configVo) {
+        this.configVo = configVo;
+    }
+
+    public void start()throws Exception{
+        String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort;
+        this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize);
+        if(this.pool.isClose()){
+            throw new Exception("Mqtt杩炴帴姹犲垵濮嬪寲澶辫触");
+        }
+        MqttClient clientSub = null ;
+        try {
+            clientSub = pool.popClient();//鏂板垱寤轰竴涓狢lient鏃讹紝姝lient瀹為檯鍘昏繛鎺QTT鏈嶅姟鍣紝濡傛灉杩炴帴涓嶄笂锛屽氨浼氭姏鍑哄紓甯�
+        }catch (Exception e){
+            throw new Exception("Mqtt杩炴帴姹犺幏寰楄繛鎺ュ紓甯�", e);
+        }
+        if(clientSub == null || !clientSub.isConnected()){
+            throw new Exception("Mqtt杩炴帴姹犺幏寰楄闃呰繛鎺ヤ笉鍙敤");
+        }
+        for(int i = 0; i < this.configVo.subTopics.length; i++){
+            clientSub.subscribe(this.configVo.subTopics[i], this.configVo.topicsQos[i], new MqttMessageListener());
+        }
+    }
+
+    public void stop()throws Exception{
+        if(this.pool != null){
+            // 鍏抽棴杩炴帴姹�
+            this.pool.close();
+        }
+    }
+
+    public MqttClient popMqttClient() throws Exception{
+        return this.pool.popClient();
+    }
+
+    public void pushMqttClient(MqttClient client) {
+        this.pool.pushClient(client);
+    }
+
+    public void publishMsg(MqttClient client, String topic, byte[] msg) throws Exception{
+        client.publish(topic, msg, this.configVo.publishQos, false);
+    }
+
+    public void publishMsg(MqttClient client, String topic, String msg) throws Exception{
+        byte[] bs = msg.getBytes("UTF-8") ;
+        client.publish(topic, bs, this.configVo.publishQos, false);
+    }
+
+    public boolean poolIsClose(){
+        if(this.pool == null){
+            return true;
+        }
+        return this.pool.isClose();
+    }
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
new file mode 100644
index 0000000..19bd3eb
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
@@ -0,0 +1,49 @@
+package com.dy.rtuMw.server.mqtt;
+
+import com.dy.common.mw.protocol4Mqtt.MqttMsgParser;
+import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
+import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
+import com.dy.common.util.Callback;
+import org.eclipse.paho.client.mqttv3.* ;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 15:52
+ * @Description
+ */
+public class MqttMessageListener implements IMqttMessageListener{
+    @Override
+    public void messageArrived(String topic, MqttMessage msg) throws Exception {
+        MqttMsgParser parser = new MqttMsgParser() ;
+        MqttSubMsg subMsg = parser.parseSubMsg(topic, msg) ;
+        this.nextDeal(subMsg);
+    }
+    private void nextDeal(MqttSubMsg subMsg)throws Exception {
+        subMsg.action(new Callback() {
+            @Override
+            public void call(Object obj) {
+                MqttSubMsg subMs = (MqttSubMsg) obj ;
+                MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ;
+                if(pubMs != null){
+                    subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ;
+                    subMs.commandId = pubMs.commandId ;
+                    try {
+                        MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs));
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+                try{
+                    MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg));
+                }catch (Exception e){
+                }
+            }
+            @Override
+            public void call(Object... objs) {
+            }
+            @Override
+            public void exception(Exception e) {
+            }
+        });
+    }
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java
new file mode 100644
index 0000000..2ae8adc
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java
@@ -0,0 +1,132 @@
+package com.dy.rtuMw.server.mqtt;
+
+import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
+import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
+import com.dy.common.queue.Node;
+import com.dy.common.queue.Queue;
+import com.dy.rtuMw.server.ServerProperties;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 17:24
+ * @Description
+ */
+public class MqttPubMsgCache {
+
+    //TCP涓嬭鍛戒护缂撳瓨闃熷垪
+    private static Queue cacheQueue = new Queue("mqttPubMsgCache") ;
+
+    private static MqttPubMsgCache instance = new MqttPubMsgCache() ;
+
+    private MqttPubMsgCache(){
+        cacheQueue.setLimit(ServerProperties.cacheUpDownDataWarnCount, ServerProperties.cacheUpDownDataMaxCount);
+    }
+
+    public static MqttPubMsgCache getInstance(){
+        return instance ;
+    }
+
+
+    public static Integer info(){
+        Integer comTotalDown = 0 ;//缂撳瓨鐨勪笅琛屽懡浠ゆ�绘暟
+        MqttPubMsgNode obj ;
+        Node node = cacheQueue.getFirstNode() ;
+        while(node != null && node.obj != null){
+            obj = (MqttPubMsgNode)node.obj;
+            if(!obj.onceReceivedResult){
+                comTotalDown ++ ;
+            }
+        }
+        return comTotalDown ;
+    }
+
+    /**
+     * 缂撳瓨鍛戒护
+     * @param result
+     * @throws Exception
+     */
+    public static void cacheCommand(MqttPubMsg result) throws Exception{
+        if(result != null){
+            cacheQueue.pushHead(new MqttPubMsgNode(result));
+        }
+    }
+
+    /**
+     * 鍖归厤鍛戒护缁撴灉
+     * @param subMsg
+     * @return
+     */
+    public static MqttPubMsg matchFromHead(MqttSubMsg subMsg){
+        MqttPubMsg pubMsg = null ;
+        MqttPubMsgNode obj = null ;
+        Node node = cacheQueue.getFirstNode() ;
+        while(node != null && node.obj != null){
+            obj = (MqttPubMsgNode)node.obj;
+            pubMsg = obj.result ;
+            if(pubMsg != null
+                    && subMsg.subMsgMatchPubMsg(pubMsg)){
+                obj.onceReceivedResult = true ;//鏍囪瘑宸茬粡鏀跺埌鍛戒护缁撴灉
+                return pubMsg;
+            }else{
+                node = node.next ;
+            }
+        }
+        return null ;
+    }
+
+    /**
+     * 鍖归厤鍛戒护缁撴灉
+     * @param subMsg
+     * @return
+     */
+    public static MqttPubMsg matchFromTail(MqttSubMsg subMsg){
+        MqttPubMsg pubMsg = null ;
+        MqttPubMsgNode obj = null ;
+        Node node = cacheQueue.getLastNode() ;
+        while(node != null && node.obj != null){
+            obj = (MqttPubMsgNode)node.obj;
+            pubMsg = obj.result ;
+            if(pubMsg != null
+                    && subMsg.subMsgMatchPubMsg(pubMsg)){
+                obj.onceReceivedResult = true ;//鏍囪瘑宸茬粡鏀跺埌鍛戒护缁撴灉
+                return pubMsg;
+            }else{
+                node = node.pre ;
+            }
+        }
+        return null ;
+    }
+
+    /**
+     * 寰楀埌绗竴涓妭鐐�
+     * @return
+     */
+    public static Node getFirstQueueNode(){
+        return cacheQueue.getFirstNode() ;
+    }
+
+    /**
+     * 寰楀埌鏈�鍚庝竴涓妭鐐�
+     * @return
+     */
+    public static Node getLastQueueNode(){
+        return cacheQueue.getLastNode() ;
+    }
+
+    /**
+     * 绉婚櫎鑺傜偣
+     * @param node
+     */
+    public static void removeNode(Node node){
+        cacheQueue.remove(node);
+    }
+
+    /**
+     * 缂撳瓨鐨勮妭鐐规暟
+     * @Return 缂撳瓨鑺傜偣鏁�
+     */
+    public static Integer size(){
+        return cacheQueue.size() ;
+    }
+
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
new file mode 100644
index 0000000..e345d59
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
@@ -0,0 +1,95 @@
+package com.dy.rtuMw.server.mqtt;
+
+import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
+import com.dy.common.queue.NodeObj;
+import com.dy.rtuMw.server.ServerProperties;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 17:25
+ * @Description
+ */
+public class MqttPubMsgNode implements NodeObj {
+
+    private static Logger log = LogManager.getLogger(MqttPubMsgNode.class.getName());
+
+    public MqttPubMsg result ;//涓嬭鍛戒护
+    public Long cachTime ;//缂撳瓨鏃跺埢
+    public boolean onceReceivedResult ;//宸茬粡鏀跺埌鍛戒护搴旂瓟
+
+
+    public MqttPubMsgNode(MqttPubMsg result){
+        this.result = result ;
+        this.cachTime = System.currentTimeMillis() ;
+        this.onceReceivedResult = false ;
+    }
+
+    /**
+     * 鑷繁澶勭悊鑷繁
+     * @param now
+     * @return
+     */
+    public boolean dealSelf(Long now){
+        if(this.onceReceivedResult){
+            //宸茬粡鏀跺埌鍛戒护缁撴灉
+            //璁板綍鐘舵��
+            //RtuStatusDealer.commandSuccess(this.result.rtuAddr, this.result.downCode, this.result.downCodeName);
+            return true ;
+        }
+        boolean noConnect2MqSv = false ;
+        MqttManager mqttManager = MqttManager.getInstance() ;
+        MqttClient mqttClient = null ;
+
+        noConnect2MqSv = mqttManager.poolIsClose() ;
+        if(noConnect2MqSv){
+            //鏈浘杩炴帴MQTT鏈嶅姟鍣�
+            return this.decideRemoveNodeFromCach(now) ;
+        }else{
+            try {
+                //濡傛灉缃戠粶涓嶅ソ鎴栨柇缃戯紝姝ゅ鐢ㄦ椂杈冮暱
+                mqttClient = mqttManager.popMqttClient() ;
+                if(mqttClient == null || !mqttClient.isConnected()){
+                    noConnect2MqSv = false ;
+                }
+            }catch (Exception e){
+                log.error("鑾峰彇MQTT瀹㈡埛绔け璐�", e);
+            }
+        }
+        if(noConnect2MqSv){
+            //鏈浘杩炴帴MQTT鏈嶅姟鍣�
+            return this.decideRemoveNodeFromCach(now) ;
+        }else{
+            if(mqttClient != null && mqttClient.isConnected()){
+                try {
+                    mqttManager.publishMsg(mqttClient, this.result.topic, this.result.msg);
+                    log.info("鍙戝竷MQTT娑堟伅锛堜富棰�=" + this.result.topic + "锛�" + this.result.msg);
+                }catch (Exception e){
+                    log.error("MQTT鍙戝竷娑堟伅澶辫触锛堜富棰�=" + this.result.topic + "锛�" , e);
+                }finally {
+                    mqttManager.pushMqttClient(mqttClient);
+                }
+                return false ;
+            }else{
+                //鏈浘杩炴帴MQTT鏈嶅姟鍣�
+                return this.decideRemoveNodeFromCach(now) ;
+            }
+        }
+    }
+
+    private boolean decideRemoveNodeFromCach(Long now){
+        if(!this.result.isCacheForOffLine){
+            //涓嶅湪绾垮懡浠や笉缂撳瓨
+            return true ;
+        }else{
+            //涓嶅湪绾垮懡浠ょ紦瀛�
+            if(now - this.cachTime >= ServerProperties.offLineCacheTimeout){
+                //缂撳瓨瓒呮椂
+                return true ;
+            }
+        }
+        return false ;
+    }
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgCache.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgCache.java
new file mode 100644
index 0000000..cfa5184
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgCache.java
@@ -0,0 +1,68 @@
+package com.dy.rtuMw.server.mqtt;
+
+import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
+import com.dy.common.queue.Node;
+import com.dy.common.queue.Queue;
+import com.dy.rtuMw.server.ServerProperties;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 16:13
+ * @Description
+ */
+public class MqttSubMsgCache {
+
+    //TCP涓嬭鍛戒护缂撳瓨闃熷垪
+    private static Queue cacheQueue = new Queue("mqttSubMsgCache") ;
+
+    private static MqttSubMsgCache instance = new MqttSubMsgCache() ;
+
+    private MqttSubMsgCache(){
+        cacheQueue.setLimit(ServerProperties.cacheUpDownDataWarnCount, ServerProperties.cacheUpDownDataMaxCount);
+    }
+
+    public static MqttSubMsgCache getInstance(){
+        return instance ;
+    }
+
+    /**
+     * 缂撳瓨璁㈤槄鐨勬秷鎭�
+     * @param node 鏀跺埌鐨勬秷鎭�
+     * @throws Exception
+     */
+    public static void cacheMsg(MqttSubMsgNode node) throws Exception{
+        cacheQueue.pushHead(node);
+    }
+    /**
+     * 寰楀埌绗竴涓妭鐐�
+     * @return
+     */
+    public static Node getFirstQueueNode(){
+        return cacheQueue.getFirstNode() ;
+    }
+
+    /**
+     * 寰楀埌鏈�鍚庝竴涓妭鐐�
+     * @return
+     */
+    public static Node getLastQueueNode(){
+        // 璋冪敤cacheQueue鐨刧etLastNode鏂规硶锛岃繑鍥炴渶鍚庝竴涓妭鐐�
+        return cacheQueue.getLastNode() ;
+    }
+
+    /**
+     * 绉婚櫎鑺傜偣
+     * @param node
+     */
+    public static void removeNode(Node node){
+        cacheQueue.remove(node);
+    }
+
+    /**
+     * 缂撳瓨鐨勮妭鐐规暟
+     * @Return 缂撳瓨鑺傜偣鏁�
+     */
+    public static Integer size(){
+        return cacheQueue.size() ;
+    }
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgNode.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgNode.java
new file mode 100644
index 0000000..a134219
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgNode.java
@@ -0,0 +1,69 @@
+package com.dy.rtuMw.server.mqtt;
+
+import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
+import com.dy.common.queue.NodeObj;
+import com.dy.common.threadPool.ThreadPool;
+import com.dy.common.threadPool.TreadPoolFactory;
+import com.dy.rtuMw.server.rtuData.TaskPool;
+import com.dy.rtuMw.server.rtuData.TaskSurpport;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 16:14
+ * @Description
+ */
+public class MqttSubMsgNode implements NodeObj {
+
+    private static Logger log = LogManager.getLogger(MqttSubMsgNode.class.getName());
+
+    protected MqttSubMsg msg;
+
+    public MqttSubMsgNode(MqttSubMsg obj){
+        this.msg = obj ;
+    }
+
+    /**
+     * 鑷繁澶勭悊鑷繁
+     * @return
+     */
+    public boolean dealSelf(){
+        try {
+            ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolLong() ;
+            pool.putJob(new ThreadPool.Job() {
+                public void execute() {
+                    if(msg != null && msg.valid()){
+                        TaskSurpport task = null ;
+                        try{
+                            task = TaskPool.popTask() ;
+                            if(task != null){
+                                task.execute(msg);
+                            }else{
+                                log.error("鏈緱鍒癕q璁㈤槄娑堟伅澶勭悊浠诲姟锛�");
+                            }
+                        }catch(Exception e){
+                            log.error("Mq璁㈤槄娑堟伅浠诲姟姹犲鐞嗘暟鎹椂鍙戠敓寮傚父", e);
+                        }finally {
+                            if(task != null){
+                                TaskPool.freeAndCleanTask(task);
+                            }
+                        }
+                    }
+                }
+                @Override
+                public void destroy(){
+                }
+                @Override
+                public boolean isDestroy(){
+                    return false ;
+                }
+
+            });
+        } catch (Exception e) {
+            log.error("鍦∕qttSubMsgObj鍐呭彂鐢熷紓甯�", e);
+        }
+        return true ;
+    }
+
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnit.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnit.java
new file mode 100644
index 0000000..12a22f9
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnit.java
@@ -0,0 +1,68 @@
+package com.dy.rtuMw.server.mqtt;
+
+import com.dy.common.mw.UnitAdapterInterface;
+import com.dy.common.mw.UnitCallbackInterface;
+import com.dy.common.mw.UnitInterface;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 14:46
+ * @Description
+ */
+public class MqttUnit  implements UnitInterface {
+
+    private static MqttUnit instance = new MqttUnit() ;
+
+    public static MqttUnitAdapter adapter ;
+    public static MqttUnitConfigVo confVo ;
+
+    private static MqttManager manager ;
+    private MqttUnit(){} ;
+
+    public static MqttUnit getInstance(){
+        return instance ;
+    }
+
+    @Override
+    public void setAdapter(UnitAdapterInterface adapter) throws Exception {
+        if(adapter == null){
+            throw new Exception("Mqtt娑堟伅妯″潡閫傞厤鍣ㄥ璞′笉鑳戒负绌猴紒") ;
+        }
+        MqttUnit.adapter = (MqttUnitAdapter)adapter ;
+        MqttUnit.confVo = MqttUnit.adapter.getConfig() ;
+        if(MqttUnit.confVo == null){
+            throw new Exception("Mqtt娑堟伅妯″潡閰嶇疆瀵硅薄涓嶈兘涓虹┖锛�") ;
+        }
+    }
+
+    /**
+     * 鍒濆鍖�
+     */
+    @Override
+    public void start(UnitCallbackInterface callback) throws Exception {
+        if(confVo.enable){
+            manager = MqttManager.getInstance() ;
+            manager.initOption(confVo);
+            manager.start();
+            callback.call(null) ;
+            System.out.println("Mqtt娑堟伅妯″潡鎴愬姛鍚姩");
+        }else{
+            System.out.println("Mqtt娑堟伅妯″潡閰嶇疆涓嶅惎鍔�");
+        }
+    }
+
+    @Override
+    public void stop(UnitCallbackInterface callback) throws Exception {
+        if(manager != null){
+            manager.stop();
+        }
+        callback.call(null) ;
+    }
+
+    public void recover(UnitCallbackInterface callback) throws Exception {
+        this.start(callback) ;
+        callback.call(null) ;
+    }
+
+}
+
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitAdapter.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitAdapter.java
new file mode 100644
index 0000000..9986c8e
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitAdapter.java
@@ -0,0 +1,14 @@
+package com.dy.rtuMw.server.mqtt;
+
+import com.dy.common.mw.UnitAdapterInterface;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 14:47
+ * @Description
+ */
+public interface MqttUnitAdapter extends UnitAdapterInterface {
+
+    MqttUnitConfigVo getConfig();
+
+}
\ No newline at end of file
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
new file mode 100644
index 0000000..c767276
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
@@ -0,0 +1,29 @@
+package com.dy.rtuMw.server.mqtt;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 14:46
+ * @Description
+ */
+public class MqttUnitConfigVo {
+    public Boolean showStartInfo ;
+    public Boolean enable ;//妯″潡鏄惁鍚姩
+    public String svIp ;//
+    public Integer svPort ;//
+    public String svUserName ;//
+    public String svUserPassword ;//
+    public Integer poolMaxSize ;//
+    public String[] subTopics ;//璁㈤槄鐨勪富棰�
+    public int[] topicsQos ;////璁㈤槄涓婚鐨凲os
+    public int publishQos ;////鍙戝竷娑堟伅鐨凲os
+
+    public MqttUnitConfigVo(){
+        this.enable = false ;
+        this.svIp = "127.0.0.1" ;
+        this.svPort = 1883 ;
+        this.svUserName = "dyyjy" ;
+        this.svUserPassword = "Dyyjy2025,;.abc!@#" ;
+        this.poolMaxSize = 10 ;
+        this.publishQos = 1 ;
+    }
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkMqttData.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkMqttData.java
new file mode 100644
index 0000000..918f363
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkMqttData.java
@@ -0,0 +1,37 @@
+package com.dy.rtuMw.server.rtuData;
+
+import com.dy.common.mw.protocol4Mqtt.pSdV1.MqttSubMsgSdV1;
+import com.dy.rtuMw.server.rtuData.pSdV1.TkFindPSdV1;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 17:05
+ * @Description
+ */
+public class TkMqttData extends TaskSurpport {
+
+    private static Logger log = LogManager.getLogger(TkMqttData.class.getName()) ;
+
+    //绫籌D锛屼竴瀹氫笌Tree.xml閰嶇疆鏂囦欢涓厤缃竴鑷�
+    public static final String taskId = "TkMqttData" ;
+
+    /**
+     * 鎵ц鑺傜偣浠诲姟
+     * @param data 闇�瑕佸鐞嗙殑鏁版嵁
+     */
+    @Override
+    public void execute(Object data) {
+        if(data == null){
+            log.error("涓ラ噸閿欒锛孧qtt璁㈤槄娑堟伅鏁版嵁涓虹┖锛�" );
+        }else{
+            if(data instanceof MqttSubMsgSdV1){
+                this.toNextOneTask(data, TkFindPSdV1.taskId);
+            }else{
+                log.error("涓ラ噸閿欒锛岃鏁版嵁绫诲瀷锛�" + data.getClass().getName() + "锛夛紝鎺ユ敹鏁版嵁浠诲姟杩樻湭瀹炵幇锛�" );
+            }
+        }
+    }
+
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkReceive.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkReceive.java
index a0d0608..d389f0d 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkReceive.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkReceive.java
@@ -1,5 +1,6 @@
 package com.dy.rtuMw.server.rtuData;
 
+import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
 import com.dy.common.mw.protocol.Data;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -21,7 +22,9 @@
 			log.error("涓ラ噸閿欒锛孯TU涓婅鏁版嵁涓虹┖锛�" );
 		}else{
 			if(data instanceof Data){
-				this.toNextTasks(data);
+				this.toNextOneTask(data, TkRtuData.taskId);
+			}else if(data instanceof MqttSubMsg){
+				this.toNextOneTask(data, TkMqttData.taskId);
 			}else{
 				log.error("涓ラ噸閿欒锛岃鏁版嵁绫诲瀷锛�" + data.getClass().getName() + "锛夛紝鎺ユ敹鏁版嵁浠诲姟杩樻湭瀹炵幇锛�" );
 			}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkFindPSdV1.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkFindPSdV1.java
new file mode 100644
index 0000000..eec2554
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkFindPSdV1.java
@@ -0,0 +1,31 @@
+package com.dy.rtuMw.server.rtuData.pSdV1;
+
+import com.dy.common.mw.protocol4Mqtt.pSdV1.MqttSubMsgSdV1;
+import com.dy.rtuMw.server.rtuData.TaskSurpport;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/5 13:43
+ * @Description
+ */
+public class TkFindPSdV1  extends TaskSurpport {
+
+    private static Logger log = LogManager.getLogger(TkFindPSdV1.class.getName()) ;
+
+    //绫籌D锛屼竴瀹氫笌Tree.xml閰嶇疆鏂囦欢涓厤缃竴鑷�
+    public static final String taskId = "TkFindPSdV1" ;
+
+    /**
+     * 鎵ц鑺傜偣浠诲姟
+     * @param data 闇�瑕佸鐞嗙殑鏁版嵁
+     */
+    @Override
+    public void execute(Object data) {
+        //鍓嶉潰鐨勪换鍔″凡缁忓垽鏂簡data涓嶄负绌�
+        MqttSubMsgSdV1 msg = (MqttSubMsgSdV1)data ;
+        log.info(msg.toString());
+    }
+
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttComResultConstantTask.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttComResultConstantTask.java
new file mode 100644
index 0000000..1c266c5
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttComResultConstantTask.java
@@ -0,0 +1,75 @@
+package com.dy.rtuMw.server.tasks;
+
+import com.dy.common.mw.core.CoreTask;
+import com.dy.common.queue.Node;
+import com.dy.rtuMw.server.mqtt.MqttComResultCache;
+import com.dy.rtuMw.server.mqtt.MqttComResultNode;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * 瀵筊TU涓婅鏁版嵁杩涜涓氬姟澶勭悊
+ */
+public class MqttComResultConstantTask extends CoreTask {
+    private static final Logger log = LogManager.getLogger(MqttComResultConstantTask.class.getName());
+
+    /**
+     * 鍦ㄥ崟绾跨▼鐜涓繍琛�
+     */
+    @Override
+    public Integer execute() {
+        try{
+            dealRtuComResult() ;
+        }catch(Exception e){
+            log.error(e);
+        }
+        return MqttComResultCache.size()>0?0:1 ;
+    }
+    /**
+     * 澶勭悊涓婅鍛戒护缁撴灉
+     */
+    public void dealRtuComResult() {
+        Node first = MqttComResultCache.getFirstQueueNode() ;
+        if(first != null){
+            Node last = MqttComResultCache.getLastQueueNode() ;
+            while (last != null){
+                last = this.doDealRtuComResult(first, last);
+            }
+        }
+    }
+
+    /**
+     * 澶勭悊缂撳瓨鐨勪笂琛屾暟鎹妭鐐�
+     * @param first 绗竴涓妭鐐�
+     * @param last 鏈�鍚庝竴涓妭鐐�
+     */
+    private Node doDealRtuComResult(Node first, Node last){
+        if(last != null){
+            //鍦╠ealNode鏂规硶涓紝鍙兘瑕佹妸last浠庨槦鍒椾腑绉婚櫎锛岃繖鏃秎ast.pre涓虹┖锛屾墍浠ユ彁鍓嶆妸last.pre鍙栧嚭鏉�
+            Node pre = last.pre ;
+            dealNode(last) ;
+            if(first != last){
+                return pre ;
+            }else{
+                //鍋滄
+                return null ;
+            }
+        }else{
+            return null ;
+        }
+    }
+
+
+    /**
+     * 澶勭悊涓�涓妭鐐�
+     * @param node 鑺傜偣
+     */
+    private void dealNode(Node node){
+        if(node != null && node.obj != null){
+            MqttComResultNode obj = (MqttComResultNode)node.obj ;
+            obj.dealSelf() ;
+            MqttComResultCache.removeNode(node);
+        }
+    }
+
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttPubMessageConstantTask.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttPubMessageConstantTask.java
new file mode 100644
index 0000000..703c658
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttPubMessageConstantTask.java
@@ -0,0 +1,81 @@
+package com.dy.rtuMw.server.tasks;
+
+import com.dy.common.mw.core.CoreTask;
+import com.dy.common.queue.Node;
+import com.dy.rtuMw.server.mqtt.MqttPubMsgCache;
+import com.dy.rtuMw.server.mqtt.MqttPubMsgNode;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 16:25
+ * @Description
+ */
+public class MqttPubMessageConstantTask extends CoreTask {
+    private static final Logger log = LogManager.getLogger(MqttPubMessageConstantTask.class.getName());
+
+    /**
+     * 鍦ㄥ崟绾跨▼鐜涓繍琛�
+     */
+    @Override
+    public Integer execute() {
+        try{
+            dealMqMsg() ;
+        }catch(Exception e){
+            log.error(e);
+        }
+        return MqttPubMsgCache.size()>0?0:1 ;
+    }
+    /**
+     * 澶勭悊MQTT璁㈤槄鐨勬秷鎭�
+     */
+    public void dealMqMsg() {
+        Node first = MqttPubMsgCache.getFirstQueueNode() ;
+        if(first != null){
+            Node last = MqttPubMsgCache.getLastQueueNode() ;
+            while (last != null){
+                last = this.doDealMqMsg(System.currentTimeMillis(), first, last);
+            }
+        }
+    }
+
+    /**
+     * 澶勭悊缂撳瓨鐨勪笂琛屾暟鎹妭鐐�
+     * @param now 褰撳墠鏃跺埢
+     * @param first 绗竴涓妭鐐�
+     * @param last 鏈�鍚庝竴涓妭鐐�
+     */
+    private Node doDealMqMsg(Long now, Node first, Node last){
+        if(last != null){
+            //鍦╠ealNode鏂规硶涓紝鍙兘瑕佹妸last浠庨槦鍒椾腑绉婚櫎锛岃繖鏃秎ast.pre涓虹┖锛屾墍浠ユ彁鍓嶆妸last.pre鍙栧嚭鏉�
+            Node pre = last.pre ;
+            dealNode(now, last) ;
+            if(first != last){
+                return pre ;
+            }else{
+                //鍋滄
+                return null ;
+            }
+        }else{
+            return null ;
+        }
+    }
+
+
+    /**
+     * 澶勭悊涓�涓妭鐐�
+     * @param now 鐜板湪鏃跺埢
+     * @param node 鑺傜偣
+     */
+    private void dealNode(Long now, Node node){
+        if(node != null && node.obj != null){
+            MqttPubMsgNode obj = (MqttPubMsgNode)node.obj ;
+            boolean removeNode = obj.dealSelf(now) ;
+            if(removeNode){
+                MqttPubMsgCache.removeNode(node);
+            }
+        }
+    }
+
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java
new file mode 100644
index 0000000..80501d1
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java
@@ -0,0 +1,77 @@
+package com.dy.rtuMw.server.tasks;
+
+import com.dy.common.mw.core.CoreTask;
+import com.dy.common.queue.Node;
+import com.dy.rtuMw.server.mqtt.MqttSubMsgCache;
+import com.dy.rtuMw.server.mqtt.MqttSubMsgNode;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/4 16:25
+ * @Description
+ */
+public class MqttSubMessageConstantTask extends CoreTask {
+    private static final Logger log = LogManager.getLogger(MqttSubMessageConstantTask.class.getName());
+
+    /**
+     * 鍦ㄥ崟绾跨▼鐜涓繍琛�
+     */
+    @Override
+    public Integer execute() {
+        try{
+            dealMqMsg() ;
+        }catch(Exception e){
+            log.error(e);
+        }
+        return MqttSubMsgCache.size()>0?0:1 ;
+    }
+    /**
+     * 澶勭悊MQTT璁㈤槄鐨勬秷鎭�
+     */
+    public void dealMqMsg() {
+        Node first = MqttSubMsgCache.getFirstQueueNode() ;
+        if(first != null){
+            Node last = MqttSubMsgCache.getLastQueueNode() ;
+            while (last != null){
+                last = this.doDealMqMsg(first, last);
+            }
+        }
+    }
+
+    /**
+     * 澶勭悊缂撳瓨鐨勪笂琛屾暟鎹妭鐐�
+     * @param first 绗竴涓妭鐐�
+     * @param last 鏈�鍚庝竴涓妭鐐�
+     */
+    private Node doDealMqMsg(Node first, Node last){
+        if(last != null){
+            //鍦╠ealNode鏂规硶涓紝鍙兘瑕佹妸last浠庨槦鍒椾腑绉婚櫎锛岃繖鏃秎ast.pre涓虹┖锛屾墍浠ユ彁鍓嶆妸last.pre鍙栧嚭鏉�
+            Node pre = last.pre ;
+            dealNode(last) ;
+            if(first != last){
+                return pre ;
+            }else{
+                //鍋滄
+                return null ;
+            }
+        }else{
+            return null ;
+        }
+    }
+
+
+    /**
+     * 澶勭悊涓�涓妭鐐�
+     * @param node 鑺傜偣
+     */
+    private void dealNode(Node node){
+        if(node != null && node.obj != null){
+            MqttSubMsgNode obj = (MqttSubMsgNode)node.obj ;
+            obj.dealSelf() ;
+            MqttSubMsgCache.removeNode(node);
+        }
+    }
+
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/WebDownCom4MqttTask.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/WebDownCom4MqttTask.java
new file mode 100644
index 0000000..6c4e8f1
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/WebDownCom4MqttTask.java
@@ -0,0 +1,51 @@
+package com.dy.rtuMw.server.tasks;
+
+import com.dy.common.mw.core.CoreTask;
+import com.dy.common.mw.protocol.Command;
+import com.dy.common.mw.protocol.Driver;
+import com.dy.common.mw.protocol.MidResult;
+import com.dy.common.mw.protocol.ProtocolCache;
+import com.dy.common.mw.protocol4Mqtt.MqttMsgParser;
+import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
+import com.dy.rtuMw.server.ServerProperties;
+import com.dy.rtuMw.server.forTcp.TcpSessionCache;
+import com.dy.rtuMw.server.mqtt.MqttPubMsgCache;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/5 15:42
+ * @Description
+ */
+
+public class WebDownCom4MqttTask extends CoreTask {
+
+    private static Logger log = LogManager.getLogger(WebDownCom4MqttTask.class.getName());
+
+    @Override
+    public Integer execute() {
+        Command com = (Command)this.data ;
+        try {
+            log.info("涓嬪彂MQTT鍛戒护" + com.getCode() + "鐨勬牳蹇冧换鍔″紑濮嬫墽琛�");
+            this.deal(com);
+        } catch (Exception e) {
+            log.error("澶勭悊涓嬭MQTT鍛戒护鍑洪敊" + (e.getMessage()==null?"!":("锛�" + e.getMessage())) ,e);
+        }
+        return null ;
+    }
+
+    /**
+     * 澶勭悊鍛戒护
+     * @param com 鍛戒护
+     * @throws Exception
+     */
+    private void deal(Command com) throws Exception{
+        MqttMsgParser parser = new MqttMsgParser() ;
+        MqttPubMsg pubMsg = parser.createPubMsg(ServerProperties.orgTag, com) ;
+        if(pubMsg != null){
+            MqttPubMsgCache.cacheCommand(pubMsg);
+        }
+    }
+
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/com/CommandCtrl.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/com/CommandCtrl.java
index 5b26ebc..5c13be1 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/com/CommandCtrl.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/com/CommandCtrl.java
@@ -7,7 +7,9 @@
 import com.dy.rtuMw.server.forTcp.TcpSessionCache;
 import com.dy.rtuMw.server.local.CommandInnerDeaLer;
 import com.dy.rtuMw.server.local.ReturnCommand;
+import com.dy.rtuMw.server.mqtt.MqttManager;
 import com.dy.rtuMw.server.msCenter.MsCenterUnit;
+import com.dy.rtuMw.server.tasks.WebDownCom4MqttTask;
 import com.dy.rtuMw.server.tasks.WebDownComTask;
 import com.dy.common.mw.core.CoreUnit;
 import com.dy.common.mw.protocol.Command;
@@ -247,6 +249,13 @@
             }catch(Exception e){
                 return BaseResponseUtils.buildError(ReturnCommand.errored("澶勭悊鍙戝悜RTU鐨勫閮ㄩ�忎紶鍛戒护鍑洪敊" + (e.getMessage() == null?"":("锛�" + e.getMessage())), com.getId(), com.getCode()) );
             }
+        }else if(commandType.equals(CommandType.mqttCommand)){
+            //鍙戝悜MQTT鐨勫閮ㄥ懡浠�
+            try{
+                return this.dealMqttCommand(com) ;
+            }catch(Exception e){
+                return BaseResponseUtils.buildError(ReturnCommand.errored("澶勭悊鍙戝悜RTU鐨勫閮ㄥ懡浠ゅ嚭閿�" + (e.getMessage() == null?"":("锛�" + e.getMessage())), com.getId(), com.getCode()) );
+            }
         }else if(commandType.equals(CommandType.resultCommand)){
             return BaseResponseUtils.buildError(ReturnCommand.errored("鍑洪敊锛岄�氫俊涓棿浠朵笉鎺ョ粨鏋滅被鍨嬬殑鍛戒护锛�", com.getId(), com.getCode()));
         }else{
@@ -336,4 +345,35 @@
         return BaseResponseUtils.buildSuccess(ReturnCommand.successed("閫忎紶鍛戒护宸叉帴鍙楋紝鍗冲皢鏋勯�犲苟涓嬪彂鍛戒护銆�", command.getId(), command.getCode()));
     }
 
+    /**
+     * 澶勭悊鍙戝悜RTU鐨勫閮ㄥ懡浠�
+     * @return 缁撴灉
+     */
+    private BaseResponse<Command> dealMqttCommand(Command command){
+        String rtuAddr = command.getRtuAddr() ;
+        if(rtuAddr == null || rtuAddr.trim().equals("")){
+            return BaseResponseUtils.buildError(ReturnCommand.errored("鍑洪敊锛岃澶嘔D涓虹┖锛�", command.getId(), command.getCode())) ;
+        }
+        if(!ServerProperties.mqttUnitEnable.booleanValue()){
+            return BaseResponseUtils.buildError(ReturnCommand.errored("鍑洪敊锛孧QTT杩炴帴妯″潡閰嶇疆鏈惎鍔紒", command.getId(), command.getCode())) ;
+        }
+        if(MqttManager.getInstance().poolIsClose()){
+            return BaseResponseUtils.buildError(ReturnCommand.errored("鍑洪敊锛孧QTT杩炴帴姹犳按鍒涘缓鎴愬姛锛�", command.getId(), command.getCode())) ;
+        }
+
+        //鐢熸垚寮傛浠诲姟
+        WebDownCom4MqttTask task = new WebDownCom4MqttTask() ;
+        task.data = command ;
+        try{
+            log.info("鏋勯�犱笅鍙慚QTT鍛戒护" + command.getCode() + "鐨勬牳蹇冧换鍔★紝骞舵斁鍏ヤ换鍔¢槦鍒椾腑");
+            CoreUnit.getInstance().pushCoreTask(task);
+        }catch(Exception e){
+            log.error(e.getMessage(), e);
+            return BaseResponseUtils.buildError(ReturnCommand.successed("MQTT鍛戒护澶勭悊澶辫触" + e.getMessage(), command.getId(), command.getCode())) ;
+        }
+        return BaseResponseUtils.buildSuccess(ReturnCommand.successed("MQTT鍛戒护宸叉帴鍙楋紝鍗冲皢鏋勯�犲苟涓嬪彂鍛戒护銆�", command.getId(), command.getCode()));
+    }
+
+
+
 }
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/comResult/CommandResultDeal.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/comResult/CommandResultDeal.java
index 2973292..658e68e 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/comResult/CommandResultDeal.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/comResult/CommandResultDeal.java
@@ -3,6 +3,7 @@
 import com.dy.common.contant.Constant;
 import com.dy.common.mw.protocol.Command;
 import com.dy.common.mw.protocol.Data;
+import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
 import com.dy.rtuMw.server.ServerProperties;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -33,6 +34,10 @@
         this.restTemplate = restTemplate ;
     }
 
+    /**
+     * RTU璁惧鏁版嵁
+     * @param data
+     */
     public void deal(Data data) {
         if (data.rtuResultSendWebUrl != null
                 && !data.rtuResultSendWebUrl.trim().equals("")
@@ -58,4 +63,34 @@
             log.error("涓ラ噸閿欒锛屽湪com.dy.aceMw.web.comResult.CommandResultDeal閲岋紝澶勭悊鐨勬槸RTU鍛戒护缁撴灉Node锛屼絾鏁版嵁涓璻tuResultSendWebUrl涓虹┖");
         }
     }
+
+    /**
+     * Mqtt娑堟伅鏁版嵁
+     * @param subMsg
+     */
+    public void deal(MqttSubMsg subMsg) {
+        if (subMsg.mqttResultSendWebUrl != null
+                && !subMsg.mqttResultSendWebUrl.trim().equals("")
+                && !subMsg.mqttResultSendWebUrl.trim().equals(Command.ignoreRtuResultSendWebUrl)) {
+            String url = UriComponentsBuilder.fromUriString(subMsg.mqttResultSendWebUrl)
+                    .build()
+                    .toUriString();
+            restTemplate.getMessageConverters().set(1,new StringHttpMessageConverter(StandardCharsets.UTF_8));
+            HttpHeaders headers = new HttpHeaders();
+            headers.setContentType(MediaType.parseMediaType("application/json;charset=UTF-8"));
+            headers.set(Constant.UserTokenKeyInHeader, ServerProperties.orgTag);
+            HttpEntity<?> httpEntity = new HttpEntity<>(subMsg, headers);
+            ResponseEntity<WebResponseVo> response = null;
+            try {
+                // 閫氳繃Post鏂瑰紡璋冪敤鎺ュ彛
+                response = restTemplate.exchange(url, HttpMethod.POST, httpEntity, WebResponseVo.class);
+            } catch (Exception e) {
+                log.error("鍛戒护缁撴灉鍥炶皟鍙戠敓寮傚父", e);
+                e.printStackTrace();
+            }
+            //assert response != null;
+        } else {
+            log.error("涓ラ噸閿欒锛屽湪com.dy.aceMw.web.comResult.CommandResultDeal閲岋紝澶勭悊鐨勬槸RTU鍛戒护缁撴灉Node锛屼絾鏁版嵁涓璻tuResultSendWebUrl涓虹┖");
+        }
+    }
 }
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml
index 8c52ed4..67e19f9 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml
@@ -92,5 +92,9 @@
 				</task>
 			</task>
 		</task>
+		<!-- Mqtt娑堟伅涓棿浠惰闃呯殑娑堟伅 -->
+		<task id="TkMqttData" name="鎺ユ敹Mqtt娑堟伅" enable="true" class="com.dy.rtuMw.server.rtuData.TkMqttData">
+			<task id="TkFindPSdV1" name="璇嗗埆灞变笢V1鏁版嵁" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkFindPSdV1">
+		</task>
 	</task>
 </project>
\ No newline at end of file
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties
index bb7f709..ec1f674 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties
@@ -29,3 +29,19 @@
 #RTU涓婅鏁版嵁鏈�灏忛棿闅旓紝澶т簬杩欎釜闂撮殧璁や负璁惧绂荤嚎浜嗭紝娴嬫帶涓�浣撻榾鏄�3锛岃〃闃�涓�浣撴満鏄�6锛岄粯璁ら噰鐢ㄦ椂闂存渶闀跨殑6
 base.upData.min.interval=6
 
+# MQTT鏈嶅姟閰嶇疆
+# 233鏈嶅姟鍣細
+#   鍏冭皨锛� mqtt.enable=false
+#   娌欑洏锛� mqtt.enable=false
+#   娴嬭瘯锛� mqtt.enable=false
+#   姊呮睙锛� mqtt.enable=false
+# 121鏈嶅姟鍣細
+#   姘戝嫟锛� mqtt.enable=true
+#   寤跺簡锛� mqtt.enable=false
+#   榛戦緳姹燂細 mqtt.enable=false
+#   鐢樺窞锛� mqtt.enable=false
+#   鍑夊窞锛� mqtt.enable=false
+#   閲戝窛锛� mqtt.enable=true
+mqtt.enable=true
+mqtt.topicAndQos=ym/sd1/10000/control/m1,1;ym/sd1/10000/control/m2,1;ym/sd1/control/m4,1;ym/sd1/10000/control/m80,1
+
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
index 631ef66..5f0b6b9 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
@@ -164,4 +164,22 @@
 		 idle="10"
 	/>
 
+
+	<!--
+	topicAndQos: 涓婚涓嶲os锛屼富棰樺悕涓庡叾Qos鐢ㄩ�楀彿闅斿紑锛屽涓富棰樺強Qos鐢ㄥ垎鍙烽殧寮�锛屼緥濡傦細ym/topic1,1;ym/topic2,1;ym/topic3,1锛屽鏋滄湁澶氫釜OrgTag锛屼富棰樺墠缂�鐢ㄥ叾OrgTag
+	publishQos: 鍙戝竷娑堟伅鐨凲os锛屽彇鍊艰寖鍥达細
+		0	鑷冲涓�娆★紙At most once锛�	娑堟伅鍙戦�佸悗涓嶄繚璇佸埌杈撅紝鍙兘涓㈠け鎴栭噸澶嶏紝寮�閿�鏈�灏忥紝鍙潬鎬ф渶浣庛��
+    	1	鑷冲皯涓�娆★紙At least once锛�	娑堟伅鑷冲皯浼氬埌杈句竴娆★紝鍙兘閲嶅锛屼絾涓嶄細涓㈠け锛屽彲闈犳�т腑绛夛紝閫傜敤浜庡鏁板満鏅��
+    	2	鎭板ソ涓�娆★紙Exactly once锛�	娑堟伅浠呬細鍒拌揪涓�娆★紝涓嶉噸澶嶄笖涓嶄涪澶憋紝鍙潬鎬ф渶楂橈紝浣嗗紑閿�鏈�澶э紝瀹炵幇鏈�澶嶆潅銆�
+     -->
+	<mqtt enable="${mqtt.enable}"
+		  svIp="121.199.41.121"
+		  svPort="1883"
+		  svUserName="dyyjy"
+		  svUserPassword="Dyyjy2025,;.abc!@#"
+		  poolMaxSize="10"
+		  topicAndQos="${mqtt.topicAndQos}"
+		  publishQos="1"
+	/>
+
 </config>
\ No newline at end of file

--
Gitblit v1.8.0