From 0c870ae5b2af348e19b10cc1a99f103f95c5a1cc Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期六, 21 六月 2025 11:47:09 +0800
Subject: [PATCH] 1、通信中间件重构MQTT相关下行命令的逻辑; 2、remote模块完善MQTT相关下行命令解除报警、搅拌启停、注肥启停、灌溉启停功能类;

---
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java |   32 +++++++++++++++++++++-----------
 1 files changed, 21 insertions(+), 11 deletions(-)

diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
index 504ee3f..1bfd24e 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
@@ -20,12 +20,16 @@
     public MqttPubMsg result ;//涓嬭鍛戒护
     public Long cachTime ;//缂撳瓨鏃跺埢
     public boolean onceReceivedResult ;//宸茬粡鏀跺埌鍛戒护搴旂瓟
+    public long lastSendTime = 0 ;//涓婃鍙戦�佹椂闂�
+    public int sendTimes = 0 ;//鍙戦�佹鏁�
 
 
     public MqttPubMsgNode(MqttPubMsg result){
         this.result = result ;
         this.cachTime = System.currentTimeMillis() ;
         this.onceReceivedResult = false ;
+        this.lastSendTime = 0L ;
+        this.sendTimes = 0 ;
     }
 
     /**
@@ -36,9 +40,13 @@
     public boolean dealSelf(Long now){
         if(this.onceReceivedResult){
             //宸茬粡鏀跺埌鍛戒护缁撴灉
-            //璁板綍鐘舵��
-            //RtuStatusDealer.commandSuccess(this.result.rtuAddr, this.result.downCode, this.result.downCodeName);
             return true ;
+        }
+        if(this.sendTimes >= (1 + MqttUnit.confVo.reSendTimesByNoResult)){
+            return this.decideRemoveNodeFromCach(now, null) ;
+        }
+        if(this.lastSendTime != 0 && now - this.lastSendTime >= MqttUnit.confVo.sendInterval){
+            return this.decideRemoveNodeFromCach(now, null) ;
         }
         boolean noConnect2MqSv = false ;
         MqttManager mqttManager = MqttManager.getInstance() ;
@@ -47,7 +55,7 @@
         noConnect2MqSv = mqttManager.poolIsClose() ;
         if(noConnect2MqSv){
             //鏈浘杩炴帴MQTT鏈嶅姟鍣�
-            return this.decideRemoveNodeFromCach(now) ;
+            return this.decideRemoveNodeFromCach(now, null) ;
         }else{
             try {
                 //濡傛灉缃戠粶涓嶅ソ鎴栨柇缃戯紝姝ゅ鐢ㄦ椂杈冮暱
@@ -61,16 +69,18 @@
         }
         if(noConnect2MqSv){
             //鏈浘杩炴帴MQTT鏈嶅姟鍣�
-            return this.decideRemoveNodeFromCach(now) ;
+            return this.decideRemoveNodeFromCach(now, null) ;
         }else{
             if(mqttClient != null && mqttClient.isConnected()){
                 try {
                     mqttManager.publishMsg(mqttClient, this.result.topic.longName(), this.result.msg);
+                    this.sendTimes ++ ;
+                    this.lastSendTime = System.currentTimeMillis() ;
                     DevStatusDealer.afterSendPubMessage(this.result.deviceId);
-                    RtuLogDealer.log4Mqtt(this.result.deviceId, "鍙戝竷娑堟伅    涓婚锛�" + this.result.topic + "   娑堟伅锛�" + this.result.msg);
-                    log.info("鍙戝竷MQTT娑堟伅锛堜富棰�=" + this.result.topic + "锛�" + this.result.msg);
+                    RtuLogDealer.log4Mqtt(this.result.deviceId, "鍙戝竷娑堟伅    涓婚锛�" + this.result.topic.longName() + "   娑堟伅锛�" + this.result.msg);
+                    log.info("鍙戝竷MQTT娑堟伅锛堜富棰�=" + this.result.topic.longName() + "锛�" + this.result.msg);
                 }catch (Exception e){
-                    log.error("MQTT鍙戝竷娑堟伅澶辫触锛堜富棰�=" + this.result.topic + "锛�" , e);
+                    log.error("MQTT鍙戝竷娑堟伅澶辫触锛堜富棰�=" + this.result.topic.longName() + "锛�" , e);
                 }finally {
                     mqttManager.pushMqttClient(mqttClient);
                 }
@@ -81,18 +91,18 @@
                 }
             }else{
                 //鏈浘杩炴帴MQTT鏈嶅姟鍣�
-                return this.decideRemoveNodeFromCach(now) ;
+                return this.decideRemoveNodeFromCach(now, false) ;
             }
         }
     }
 
-    private boolean decideRemoveNodeFromCach(Long now){
-        if(!this.result.isCacheForOffLine){
+    private boolean decideRemoveNodeFromCach(Long now, Boolean isOffLine){
+        if(isOffLine != null && isOffLine.booleanValue() && !this.result.isCacheForOffLine){
             //涓嶅湪绾垮懡浠や笉缂撳瓨
             return true ;
         }else{
             //涓嶅湪绾垮懡浠ょ紦瀛�
-            if(now - this.cachTime >= ServerProperties.offLineCacheTimeout){
+            if(now - this.cachTime >= MqttUnit.confVo.comCacheTimeout){
                 //缂撳瓨瓒呮椂
                 return true ;
             }

--
Gitblit v1.8.0