package com.dy.rtuMw.server.mqtt; import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; import com.dy.common.queue.Node; import com.dy.common.queue.Queue; import com.dy.rtuMw.server.ServerProperties; /** * @Author: liurunyu * @Date: 2025/6/4 16:13 * @Description */ public class MqttSubMsgCache { //TCP下行命令缓存队列 private static Queue cacheQueue = new Queue("mqttSubMsgCache") ; private static MqttSubMsgCache instance = new MqttSubMsgCache() ; private MqttSubMsgCache(){ cacheQueue.setLimit(ServerProperties.cacheUpDownDataWarnCount, ServerProperties.cacheUpDownDataMaxCount); } public static MqttSubMsgCache getInstance(){ return instance ; } /** * 缓存订阅的消息 * @param node 收到的消息 * @throws Exception */ public static void cacheMsg(MqttSubMsgNode node) throws Exception{ cacheQueue.pushHead(node); } /** * 得到第一个节点 * @return */ public static Node getFirstQueueNode(){ return cacheQueue.getFirstNode() ; } /** * 得到最后一个节点 * @return */ public static Node getLastQueueNode(){ // 调用cacheQueue的getLastNode方法,返回最后一个节点 return cacheQueue.getLastNode() ; } /** * 移除节点 * @param node */ public static void removeNode(Node node){ cacheQueue.remove(node); } /** * 缓存的节点数 * @Return 缓存节点数 */ public static Integer size(){ return cacheQueue.size() ; } }