| package com.dy.rtuMw.server.mqtt; | 
|   | 
| import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; | 
| import com.dy.common.queue.NodeObj; | 
| import com.dy.rtuMw.server.ServerProperties; | 
| import com.dy.rtuMw.server.forTcp.RtuLogDealer; | 
| import org.apache.logging.log4j.LogManager; | 
| import org.apache.logging.log4j.Logger; | 
| import org.eclipse.paho.client.mqttv3.MqttClient; | 
|   | 
| /** | 
|  * @Author: liurunyu | 
|  * @Date: 2025/6/4 17:25 | 
|  * @Description | 
|  */ | 
| public class MqttPubMsgNode implements NodeObj { | 
|   | 
|     private static Logger log = LogManager.getLogger(MqttPubMsgNode.class.getName()); | 
|   | 
|     public MqttPubMsg result ;//下行命令 | 
|     public Long cachTime ;//缓存时刻 | 
|     public boolean onceReceivedResult ;//已经收到命令应答 | 
|     public long lastSendTime = 0 ;//上次发送时间 | 
|     public int sendTimes = 0 ;//发送次数 | 
|   | 
|   | 
|     public MqttPubMsgNode(MqttPubMsg result){ | 
|         this.result = result ; | 
|         this.cachTime = System.currentTimeMillis() ; | 
|         this.onceReceivedResult = false ; | 
|         this.lastSendTime = 0L ; | 
|         this.sendTimes = 0 ; | 
|     } | 
|   | 
|     /** | 
|      * 自己处理自己 | 
|      * @param now | 
|      * @return | 
|      */ | 
|     public boolean dealSelf(Long now){ | 
|         if(this.onceReceivedResult){ | 
|             //已经收到命令结果 | 
|             return true ; | 
|         } | 
|         if(this.sendTimes >= (1 + MqttUnit.confVo.reSendTimesByNoResult)){ | 
|             return this.decideRemoveNodeFromCach(now, null) ; | 
|         } | 
|         if(this.lastSendTime != 0 && now - this.lastSendTime >= MqttUnit.confVo.sendInterval){ | 
|             return this.decideRemoveNodeFromCach(now, null) ; | 
|         } | 
|         boolean noConnect2MqSv = false ; | 
|         MqttManager mqttManager = MqttManager.getInstance() ; | 
|         MqttClient mqttClient = null ; | 
|   | 
|         noConnect2MqSv = mqttManager.poolIsClose() ; | 
|         if(noConnect2MqSv){ | 
|             //未曾连接MQTT服务器 | 
|             return this.decideRemoveNodeFromCach(now, null) ; | 
|         }else{ | 
|             try { | 
|                 //如果网络不好或断网,此处用时较长 | 
|                 mqttClient = mqttManager.popMqttClient() ; | 
|                 if(mqttClient == null || !mqttClient.isConnected()){ | 
|                     noConnect2MqSv = false ; | 
|                 } | 
|             }catch (Exception e){ | 
|                 log.error("获取MQTT客户端失败", e); | 
|             } | 
|         } | 
|         if(noConnect2MqSv){ | 
|             //未曾连接MQTT服务器 | 
|             return this.decideRemoveNodeFromCach(now, null) ; | 
|         }else{ | 
|             if(mqttClient != null && mqttClient.isConnected()){ | 
|                 try { | 
|                     mqttManager.publishMsg(mqttClient, this.result.topic.longName(), this.result.msg); | 
|                     this.sendTimes ++ ; | 
|                     this.lastSendTime = System.currentTimeMillis() ; | 
|                     DevStatusDealer.afterSendPubMessage(this.result.deviceId); | 
|                     RtuLogDealer.log4Mqtt(this.result.deviceId, "发布消息    主题:" + this.result.topic.longName() + "   消息:" + this.result.msg); | 
|                     log.info("发布MQTT消息(主题=" + this.result.topic.longName() + ")" + this.result.msg); | 
|                 }catch (Exception e){ | 
|                     log.error("MQTT发布消息失败(主题=" + this.result.topic.longName() + ")" , e); | 
|                 }finally { | 
|                     mqttManager.pushMqttClient(mqttClient); | 
|                 } | 
|                 if(this.result.hasResponse){ | 
|                     return false ; | 
|                 }else{ | 
|                     return true ; | 
|                 } | 
|             }else{ | 
|                 //未曾连接MQTT服务器 | 
|                 return this.decideRemoveNodeFromCach(now, false) ; | 
|             } | 
|         } | 
|     } | 
|   | 
|     private boolean decideRemoveNodeFromCach(Long now, Boolean isOffLine){ | 
|         if(isOffLine != null && isOffLine.booleanValue() && !this.result.isCacheForOffLine){ | 
|             //不在线命令不缓存 | 
|             return true ; | 
|         }else{ | 
|             //不在线命令缓存 | 
|             if(now - this.cachTime >= MqttUnit.confVo.comCacheTimeout){ | 
|                 //缓存超时 | 
|                 return true ; | 
|             } | 
|         } | 
|         return false ; | 
|     } | 
| } |