package com.dy.rtuMw.server.mqtt; import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; import com.dy.common.queue.NodeObj; import com.dy.rtuMw.server.ServerProperties; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.eclipse.paho.client.mqttv3.MqttClient; /** * @Author: liurunyu * @Date: 2025/6/4 17:25 * @Description */ public class MqttPubMsgNode implements NodeObj { private static Logger log = LogManager.getLogger(MqttPubMsgNode.class.getName()); public MqttPubMsg result ;//下行命令 public Long cachTime ;//缓存时刻 public boolean onceReceivedResult ;//已经收到命令应答 public MqttPubMsgNode(MqttPubMsg result){ this.result = result ; this.cachTime = System.currentTimeMillis() ; this.onceReceivedResult = false ; } /** * 自己处理自己 * @param now * @return */ public boolean dealSelf(Long now){ if(this.onceReceivedResult){ //已经收到命令结果 //记录状态 //RtuStatusDealer.commandSuccess(this.result.rtuAddr, this.result.downCode, this.result.downCodeName); return true ; } boolean noConnect2MqSv = false ; MqttManager mqttManager = MqttManager.getInstance() ; MqttClient mqttClient = null ; noConnect2MqSv = mqttManager.poolIsClose() ; if(noConnect2MqSv){ //未曾连接MQTT服务器 return this.decideRemoveNodeFromCach(now) ; }else{ try { //如果网络不好或断网,此处用时较长 mqttClient = mqttManager.popMqttClient() ; if(mqttClient == null || !mqttClient.isConnected()){ noConnect2MqSv = false ; } }catch (Exception e){ log.error("获取MQTT客户端失败", e); } } if(noConnect2MqSv){ //未曾连接MQTT服务器 return this.decideRemoveNodeFromCach(now) ; }else{ if(mqttClient != null && mqttClient.isConnected()){ try { mqttManager.publishMsg(mqttClient, this.result.topic, this.result.msg); log.info("发布MQTT消息(主题=" + this.result.topic + ")" + this.result.msg); }catch (Exception e){ log.error("MQTT发布消息失败(主题=" + this.result.topic + ")" , e); }finally { mqttManager.pushMqttClient(mqttClient); } return false ; }else{ //未曾连接MQTT服务器 return this.decideRemoveNodeFromCach(now) ; } } } private boolean decideRemoveNodeFromCach(Long now){ if(!this.result.isCacheForOffLine){ //不在线命令不缓存 return true ; }else{ //不在线命令缓存 if(now - this.cachTime >= ServerProperties.offLineCacheTimeout){ //缓存超时 return true ; } } return false ; } }