package com.dy.rtuMw.server.mqtt; 
 | 
  
 | 
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; 
 | 
import com.dy.common.queue.NodeObj; 
 | 
import com.dy.rtuMw.server.ServerProperties; 
 | 
import com.dy.rtuMw.server.forTcp.RtuLogDealer; 
 | 
import org.apache.logging.log4j.LogManager; 
 | 
import org.apache.logging.log4j.Logger; 
 | 
import org.eclipse.paho.client.mqttv3.MqttClient; 
 | 
  
 | 
/** 
 | 
 * @Author: liurunyu 
 | 
 * @Date: 2025/6/4 17:25 
 | 
 * @Description 
 | 
 */ 
 | 
public class MqttPubMsgNode implements NodeObj { 
 | 
  
 | 
    private static Logger log = LogManager.getLogger(MqttPubMsgNode.class.getName()); 
 | 
  
 | 
    public MqttPubMsg result ;//下行命令 
 | 
    public Long cachTime ;//缓存时刻 
 | 
    public boolean onceReceivedResult ;//已经收到命令应答 
 | 
    public long lastSendTime = 0 ;//上次发送时间 
 | 
    public int sendTimes = 0 ;//发送次数 
 | 
  
 | 
  
 | 
    public MqttPubMsgNode(MqttPubMsg result){ 
 | 
        this.result = result ; 
 | 
        this.cachTime = System.currentTimeMillis() ; 
 | 
        this.onceReceivedResult = false ; 
 | 
        this.lastSendTime = 0L ; 
 | 
        this.sendTimes = 0 ; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 自己处理自己 
 | 
     * @param now 
 | 
     * @return 
 | 
     */ 
 | 
    public boolean dealSelf(Long now){ 
 | 
        if(this.onceReceivedResult){ 
 | 
            //已经收到命令结果 
 | 
            return true ; 
 | 
        } 
 | 
        if(this.sendTimes >= (1 + MqttUnit.confVo.reSendTimesByNoResult)){ 
 | 
            return this.decideRemoveNodeFromCach(now, null) ; 
 | 
        } 
 | 
        if(this.lastSendTime != 0 && now - this.lastSendTime >= MqttUnit.confVo.sendInterval){ 
 | 
            return this.decideRemoveNodeFromCach(now, null) ; 
 | 
        } 
 | 
        boolean noConnect2MqSv = false ; 
 | 
        MqttManager mqttManager = MqttManager.getInstance() ; 
 | 
        MqttClient mqttClient = null ; 
 | 
  
 | 
        noConnect2MqSv = mqttManager.poolIsClose() ; 
 | 
        if(noConnect2MqSv){ 
 | 
            //未曾连接MQTT服务器 
 | 
            return this.decideRemoveNodeFromCach(now, null) ; 
 | 
        }else{ 
 | 
            try { 
 | 
                //如果网络不好或断网,此处用时较长 
 | 
                mqttClient = mqttManager.popMqttClient() ; 
 | 
                if(mqttClient == null || !mqttClient.isConnected()){ 
 | 
                    noConnect2MqSv = false ; 
 | 
                } 
 | 
            }catch (Exception e){ 
 | 
                log.error("获取MQTT客户端失败", e); 
 | 
            } 
 | 
        } 
 | 
        if(noConnect2MqSv){ 
 | 
            //未曾连接MQTT服务器 
 | 
            return this.decideRemoveNodeFromCach(now, null) ; 
 | 
        }else{ 
 | 
            if(mqttClient != null && mqttClient.isConnected()){ 
 | 
                try { 
 | 
                    mqttManager.publishMsg(mqttClient, this.result.topic.longName(), this.result.msg); 
 | 
                    this.sendTimes ++ ; 
 | 
                    this.lastSendTime = System.currentTimeMillis() ; 
 | 
                    DevStatusDealer.afterSendPubMessage(this.result.deviceId); 
 | 
                    RtuLogDealer.log4Mqtt(this.result.deviceId, "发布消息    主题:" + this.result.topic.longName() + "   消息:" + this.result.msg); 
 | 
                    log.info("发布MQTT消息(主题=" + this.result.topic.longName() + ")" + this.result.msg); 
 | 
                }catch (Exception e){ 
 | 
                    log.error("MQTT发布消息失败(主题=" + this.result.topic.longName() + ")" , e); 
 | 
                }finally { 
 | 
                    mqttManager.pushMqttClient(mqttClient); 
 | 
                } 
 | 
                if(this.result.hasResponse){ 
 | 
                    return false ; 
 | 
                }else{ 
 | 
                    return true ; 
 | 
                } 
 | 
            }else{ 
 | 
                //未曾连接MQTT服务器 
 | 
                return this.decideRemoveNodeFromCach(now, false) ; 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
  
 | 
    private boolean decideRemoveNodeFromCach(Long now, Boolean isOffLine){ 
 | 
        if(isOffLine != null && isOffLine.booleanValue() && !this.result.isCacheForOffLine){ 
 | 
            //不在线命令不缓存 
 | 
            return true ; 
 | 
        }else{ 
 | 
            //不在线命令缓存 
 | 
            if(now - this.cachTime >= MqttUnit.confVo.comCacheTimeout){ 
 | 
                //缓存超时 
 | 
                return true ; 
 | 
            } 
 | 
        } 
 | 
        return false ; 
 | 
    } 
 | 
} 
 |