From 0c870ae5b2af348e19b10cc1a99f103f95c5a1cc Mon Sep 17 00:00:00 2001 From: liurunyu <lry9898@163.com> Date: 星期六, 21 六月 2025 11:47:09 +0800 Subject: [PATCH] 1、通信中间件重构MQTT相关下行命令的逻辑; 2、remote模块完善MQTT相关下行命令解除报警、搅拌启停、注肥启停、灌溉启停功能类; --- pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java | 32 +++++++++++++++++++++----------- 1 files changed, 21 insertions(+), 11 deletions(-) diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java index 504ee3f..1bfd24e 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java @@ -20,12 +20,16 @@ public MqttPubMsg result ;//涓嬭鍛戒护 public Long cachTime ;//缂撳瓨鏃跺埢 public boolean onceReceivedResult ;//宸茬粡鏀跺埌鍛戒护搴旂瓟 + public long lastSendTime = 0 ;//涓婃鍙戦�佹椂闂� + public int sendTimes = 0 ;//鍙戦�佹鏁� public MqttPubMsgNode(MqttPubMsg result){ this.result = result ; this.cachTime = System.currentTimeMillis() ; this.onceReceivedResult = false ; + this.lastSendTime = 0L ; + this.sendTimes = 0 ; } /** @@ -36,9 +40,13 @@ public boolean dealSelf(Long now){ if(this.onceReceivedResult){ //宸茬粡鏀跺埌鍛戒护缁撴灉 - //璁板綍鐘舵�� - //RtuStatusDealer.commandSuccess(this.result.rtuAddr, this.result.downCode, this.result.downCodeName); return true ; + } + if(this.sendTimes >= (1 + MqttUnit.confVo.reSendTimesByNoResult)){ + return this.decideRemoveNodeFromCach(now, null) ; + } + if(this.lastSendTime != 0 && now - this.lastSendTime >= MqttUnit.confVo.sendInterval){ + return this.decideRemoveNodeFromCach(now, null) ; } boolean noConnect2MqSv = false ; MqttManager mqttManager = MqttManager.getInstance() ; @@ -47,7 +55,7 @@ noConnect2MqSv = mqttManager.poolIsClose() ; if(noConnect2MqSv){ //鏈浘杩炴帴MQTT鏈嶅姟鍣� - return this.decideRemoveNodeFromCach(now) ; + return this.decideRemoveNodeFromCach(now, null) ; }else{ try { //濡傛灉缃戠粶涓嶅ソ鎴栨柇缃戯紝姝ゅ鐢ㄦ椂杈冮暱 @@ -61,16 +69,18 @@ } if(noConnect2MqSv){ //鏈浘杩炴帴MQTT鏈嶅姟鍣� - return this.decideRemoveNodeFromCach(now) ; + return this.decideRemoveNodeFromCach(now, null) ; }else{ if(mqttClient != null && mqttClient.isConnected()){ try { mqttManager.publishMsg(mqttClient, this.result.topic.longName(), this.result.msg); + this.sendTimes ++ ; + this.lastSendTime = System.currentTimeMillis() ; DevStatusDealer.afterSendPubMessage(this.result.deviceId); - RtuLogDealer.log4Mqtt(this.result.deviceId, "鍙戝竷娑堟伅 涓婚锛�" + this.result.topic + " 娑堟伅锛�" + this.result.msg); - log.info("鍙戝竷MQTT娑堟伅锛堜富棰�=" + this.result.topic + "锛�" + this.result.msg); + RtuLogDealer.log4Mqtt(this.result.deviceId, "鍙戝竷娑堟伅 涓婚锛�" + this.result.topic.longName() + " 娑堟伅锛�" + this.result.msg); + log.info("鍙戝竷MQTT娑堟伅锛堜富棰�=" + this.result.topic.longName() + "锛�" + this.result.msg); }catch (Exception e){ - log.error("MQTT鍙戝竷娑堟伅澶辫触锛堜富棰�=" + this.result.topic + "锛�" , e); + log.error("MQTT鍙戝竷娑堟伅澶辫触锛堜富棰�=" + this.result.topic.longName() + "锛�" , e); }finally { mqttManager.pushMqttClient(mqttClient); } @@ -81,18 +91,18 @@ } }else{ //鏈浘杩炴帴MQTT鏈嶅姟鍣� - return this.decideRemoveNodeFromCach(now) ; + return this.decideRemoveNodeFromCach(now, false) ; } } } - private boolean decideRemoveNodeFromCach(Long now){ - if(!this.result.isCacheForOffLine){ + private boolean decideRemoveNodeFromCach(Long now, Boolean isOffLine){ + if(isOffLine != null && isOffLine.booleanValue() && !this.result.isCacheForOffLine){ //涓嶅湪绾垮懡浠や笉缂撳瓨 return true ; }else{ //涓嶅湪绾垮懡浠ょ紦瀛� - if(now - this.cachTime >= ServerProperties.offLineCacheTimeout){ + if(now - this.cachTime >= MqttUnit.confVo.comCacheTimeout){ //缂撳瓨瓒呮椂 return true ; } -- Gitblit v1.8.0