package com.dy.rtuMw.server.mqtt; import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; import com.dy.common.queue.Node; import com.dy.common.queue.Queue; import com.dy.rtuMw.server.ServerProperties; /** * @Author: liurunyu * @Date: 2025/6/4 17:24 * @Description */ public class MqttPubMsgCache { //TCP下行命令缓存队列 private static Queue cacheQueue = new Queue("mqttPubMsgCache") ; private static MqttPubMsgCache instance = new MqttPubMsgCache() ; private MqttPubMsgCache(){ cacheQueue.setLimit(ServerProperties.cacheUpDownDataWarnCount, ServerProperties.cacheUpDownDataMaxCount); } public static MqttPubMsgCache getInstance(){ return instance ; } public static Integer info(){ Integer comTotalDown = 0 ;//缓存的下行命令总数 MqttPubMsgNode obj ; Node node = cacheQueue.getFirstNode() ; while(node != null && node.obj != null){ obj = (MqttPubMsgNode)node.obj; if(!obj.onceReceivedResult){ comTotalDown ++ ; } } return comTotalDown ; } /** * 缓存命令 * @param result * @throws Exception */ public static void cacheCommand(MqttPubMsg result) throws Exception{ if(result != null){ cacheQueue.pushHead(new MqttPubMsgNode(result)); } } /** * 匹配命令结果 * @param subMsg * @return */ public static MqttPubMsg matchFromHead(MqttSubMsg subMsg){ MqttPubMsg pubMsg = null ; MqttPubMsgNode obj = null ; Node node = cacheQueue.getFirstNode() ; while(node != null && node.obj != null){ obj = (MqttPubMsgNode)node.obj; pubMsg = obj.result ; if(pubMsg != null && subMsg.subMsgMatchPubMsg(pubMsg)){ obj.onceReceivedResult = true ;//标识已经收到命令结果 return pubMsg; }else{ node = node.next ; } } return null ; } /** * 匹配命令结果 * @param subMsg * @return */ public static MqttPubMsg matchFromTail(MqttSubMsg subMsg){ MqttPubMsg pubMsg = null ; MqttPubMsgNode obj = null ; Node node = cacheQueue.getLastNode() ; while(node != null && node.obj != null){ obj = (MqttPubMsgNode)node.obj; pubMsg = obj.result ; if(pubMsg != null && subMsg.subMsgMatchPubMsg(pubMsg)){ obj.onceReceivedResult = true ;//标识已经收到命令结果 return pubMsg; }else{ node = node.pre ; } } return null ; } /** * 得到第一个节点 * @return */ public static Node getFirstQueueNode(){ return cacheQueue.getFirstNode() ; } /** * 得到最后一个节点 * @return */ public static Node getLastQueueNode(){ return cacheQueue.getLastNode() ; } /** * 移除节点 * @param node */ public static void removeNode(Node node){ cacheQueue.remove(node); } /** * 缓存的节点数 * @Return 缓存节点数 */ public static Integer size(){ return cacheQueue.size() ; } }