package com.dy.rtuMw.server.mqtt;
|
|
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
|
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
|
import com.dy.common.queue.Node;
|
import com.dy.common.queue.Queue;
|
import com.dy.rtuMw.server.ServerProperties;
|
|
/**
|
* @Author: liurunyu
|
* @Date: 2025/6/4 17:24
|
* @Description
|
*/
|
public class MqttPubMsgCache {
|
|
//TCP下行命令缓存队列
|
private static Queue cacheQueue = new Queue("mqttPubMsgCache") ;
|
|
private static MqttPubMsgCache instance = new MqttPubMsgCache() ;
|
|
private MqttPubMsgCache(){
|
cacheQueue.setLimit(ServerProperties.cacheUpDownDataWarnCount, ServerProperties.cacheUpDownDataMaxCount);
|
}
|
|
public static MqttPubMsgCache getInstance(){
|
return instance ;
|
}
|
|
|
public static Integer info(){
|
Integer comTotalDown = 0 ;//缓存的下行命令总数
|
MqttPubMsgNode obj ;
|
Node node = cacheQueue.getFirstNode() ;
|
while(node != null && node.obj != null){
|
obj = (MqttPubMsgNode)node.obj;
|
if(!obj.onceReceivedResult){
|
comTotalDown ++ ;
|
}
|
}
|
return comTotalDown ;
|
}
|
|
/**
|
* 缓存命令
|
* @param result
|
* @throws Exception
|
*/
|
public static void cacheCommand(MqttPubMsg result) throws Exception{
|
if(result != null){
|
cacheQueue.pushHead(new MqttPubMsgNode(result));
|
}
|
}
|
|
/**
|
* 匹配命令结果
|
* @param subMsg
|
* @return
|
*/
|
public static MqttPubMsg matchFromHead(MqttSubMsg subMsg){
|
MqttPubMsg pubMsg = null ;
|
MqttPubMsgNode obj = null ;
|
Node node = cacheQueue.getFirstNode() ;
|
while(node != null && node.obj != null){
|
obj = (MqttPubMsgNode)node.obj;
|
pubMsg = obj.result ;
|
if(pubMsg != null
|
&& subMsg.subMsgMatchPubMsg(pubMsg)){
|
obj.onceReceivedResult = true ;//标识已经收到命令结果
|
return pubMsg;
|
}else{
|
node = node.next ;
|
}
|
}
|
return null ;
|
}
|
|
/**
|
* 匹配命令结果
|
* @param subMsg
|
* @return
|
*/
|
public static MqttPubMsg matchFromTail(MqttSubMsg subMsg){
|
MqttPubMsg pubMsg = null ;
|
MqttPubMsgNode obj = null ;
|
Node node = cacheQueue.getLastNode() ;
|
while(node != null && node.obj != null){
|
obj = (MqttPubMsgNode)node.obj;
|
pubMsg = obj.result ;
|
if(pubMsg != null
|
&& subMsg.subMsgMatchPubMsg(pubMsg)){
|
obj.onceReceivedResult = true ;//标识已经收到命令结果
|
return pubMsg;
|
}else{
|
node = node.pre ;
|
}
|
}
|
return null ;
|
}
|
|
/**
|
* 得到第一个节点
|
* @return
|
*/
|
public static Node getFirstQueueNode(){
|
return cacheQueue.getFirstNode() ;
|
}
|
|
/**
|
* 得到最后一个节点
|
* @return
|
*/
|
public static Node getLastQueueNode(){
|
return cacheQueue.getLastNode() ;
|
}
|
|
/**
|
* 移除节点
|
* @param node
|
*/
|
public static void removeNode(Node node){
|
cacheQueue.remove(node);
|
}
|
|
/**
|
* 缓存的节点数
|
* @Return 缓存节点数
|
*/
|
public static Integer size(){
|
return cacheQueue.size() ;
|
}
|
|
}
|