package com.dy.rtuMw.server.mqtt;
|
|
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
|
import com.dy.common.queue.NodeObj;
|
import com.dy.rtuMw.server.ServerProperties;
|
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.Logger;
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
|
/**
|
* @Author: liurunyu
|
* @Date: 2025/6/4 17:25
|
* @Description
|
*/
|
public class MqttPubMsgNode implements NodeObj {
|
|
private static Logger log = LogManager.getLogger(MqttPubMsgNode.class.getName());
|
|
public MqttPubMsg result ;//下行命令
|
public Long cachTime ;//缓存时刻
|
public boolean onceReceivedResult ;//已经收到命令应答
|
|
|
public MqttPubMsgNode(MqttPubMsg result){
|
this.result = result ;
|
this.cachTime = System.currentTimeMillis() ;
|
this.onceReceivedResult = false ;
|
}
|
|
/**
|
* 自己处理自己
|
* @param now
|
* @return
|
*/
|
public boolean dealSelf(Long now){
|
if(this.onceReceivedResult){
|
//已经收到命令结果
|
//记录状态
|
//RtuStatusDealer.commandSuccess(this.result.rtuAddr, this.result.downCode, this.result.downCodeName);
|
return true ;
|
}
|
boolean noConnect2MqSv = false ;
|
MqttManager mqttManager = MqttManager.getInstance() ;
|
MqttClient mqttClient = null ;
|
|
noConnect2MqSv = mqttManager.poolIsClose() ;
|
if(noConnect2MqSv){
|
//未曾连接MQTT服务器
|
return this.decideRemoveNodeFromCach(now) ;
|
}else{
|
try {
|
//如果网络不好或断网,此处用时较长
|
mqttClient = mqttManager.popMqttClient() ;
|
if(mqttClient == null || !mqttClient.isConnected()){
|
noConnect2MqSv = false ;
|
}
|
}catch (Exception e){
|
log.error("获取MQTT客户端失败", e);
|
}
|
}
|
if(noConnect2MqSv){
|
//未曾连接MQTT服务器
|
return this.decideRemoveNodeFromCach(now) ;
|
}else{
|
if(mqttClient != null && mqttClient.isConnected()){
|
try {
|
mqttManager.publishMsg(mqttClient, this.result.topic, this.result.msg);
|
log.info("发布MQTT消息(主题=" + this.result.topic + ")" + this.result.msg);
|
}catch (Exception e){
|
log.error("MQTT发布消息失败(主题=" + this.result.topic + ")" , e);
|
}finally {
|
mqttManager.pushMqttClient(mqttClient);
|
}
|
return false ;
|
}else{
|
//未曾连接MQTT服务器
|
return this.decideRemoveNodeFromCach(now) ;
|
}
|
}
|
}
|
|
private boolean decideRemoveNodeFromCach(Long now){
|
if(!this.result.isCacheForOffLine){
|
//不在线命令不缓存
|
return true ;
|
}else{
|
//不在线命令缓存
|
if(now - this.cachTime >= ServerProperties.offLineCacheTimeout){
|
//缓存超时
|
return true ;
|
}
|
}
|
return false ;
|
}
|
}
|