package com.dy.rtuMw.server.mqtt; 
 | 
  
 | 
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; 
 | 
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; 
 | 
import com.dy.common.queue.Node; 
 | 
import com.dy.common.queue.Queue; 
 | 
import com.dy.rtuMw.server.ServerProperties; 
 | 
  
 | 
/** 
 | 
 * @Author: liurunyu 
 | 
 * @Date: 2025/6/4 17:24 
 | 
 * @Description 
 | 
 */ 
 | 
public class MqttPubMsgCache { 
 | 
  
 | 
    //TCP下行命令缓存队列 
 | 
    private static Queue cacheQueue = new Queue("mqttPubMsgCache") ; 
 | 
  
 | 
    private static MqttPubMsgCache instance = new MqttPubMsgCache() ; 
 | 
  
 | 
    private MqttPubMsgCache(){ 
 | 
        cacheQueue.setLimit(ServerProperties.cacheUpDownDataWarnCount, ServerProperties.cacheUpDownDataMaxCount); 
 | 
    } 
 | 
  
 | 
    public static MqttPubMsgCache getInstance(){ 
 | 
        return instance ; 
 | 
    } 
 | 
  
 | 
  
 | 
    public static Integer info(){ 
 | 
        Integer comTotalDown = 0 ;//缓存的下行命令总数 
 | 
        MqttPubMsgNode obj ; 
 | 
        Node node = cacheQueue.getFirstNode() ; 
 | 
        while(node != null && node.obj != null){ 
 | 
            obj = (MqttPubMsgNode)node.obj; 
 | 
            if(!obj.onceReceivedResult){ 
 | 
                comTotalDown ++ ; 
 | 
            } 
 | 
        } 
 | 
        return comTotalDown ; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 缓存命令 
 | 
     * @param result 
 | 
     * @throws Exception 
 | 
     */ 
 | 
    public static void cacheCommand(MqttPubMsg result) throws Exception{ 
 | 
        if(result != null){ 
 | 
            cacheQueue.pushHead(new MqttPubMsgNode(result)); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 匹配命令结果 
 | 
     * @param subMsg 
 | 
     * @return 
 | 
     */ 
 | 
    public static MqttPubMsg matchFromHead(MqttSubMsg subMsg){ 
 | 
        MqttPubMsg pubMsg = null ; 
 | 
        MqttPubMsgNode obj = null ; 
 | 
        Node node = cacheQueue.getFirstNode() ; 
 | 
        while(node != null && node.obj != null){ 
 | 
            obj = (MqttPubMsgNode)node.obj; 
 | 
            pubMsg = obj.result ; 
 | 
            if(!obj.onceReceivedResult 
 | 
                    && pubMsg != null 
 | 
                    && subMsg.subMsgMatchPubMsg(pubMsg)){ 
 | 
                obj.onceReceivedResult = true ;//标识已经收到命令结果 
 | 
                return pubMsg; 
 | 
            }else{ 
 | 
                node = node.next ; 
 | 
            } 
 | 
        } 
 | 
        return null ; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 匹配命令结果 
 | 
     * @param subMsg 
 | 
     * @return 
 | 
     */ 
 | 
    public static MqttPubMsg matchFromTail(MqttSubMsg subMsg){ 
 | 
        MqttPubMsg pubMsg = null ; 
 | 
        MqttPubMsgNode obj = null ; 
 | 
        Node node = cacheQueue.getLastNode() ; 
 | 
        while(node != null && node.obj != null){ 
 | 
            obj = (MqttPubMsgNode)node.obj; 
 | 
            pubMsg = obj.result ; 
 | 
            if(!obj.onceReceivedResult 
 | 
                    && pubMsg != null 
 | 
                    && subMsg.subMsgMatchPubMsg(pubMsg)){ 
 | 
                obj.onceReceivedResult = true ;//标识已经收到命令结果 
 | 
                return pubMsg; 
 | 
            }else{ 
 | 
                node = node.pre ; 
 | 
            } 
 | 
        } 
 | 
        return null ; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 得到第一个节点 
 | 
     * @return 
 | 
     */ 
 | 
    public static Node getFirstQueueNode(){ 
 | 
        return cacheQueue.getFirstNode() ; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 得到最后一个节点 
 | 
     * @return 
 | 
     */ 
 | 
    public static Node getLastQueueNode(){ 
 | 
        return cacheQueue.getLastNode() ; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 移除节点 
 | 
     * @param node 
 | 
     */ 
 | 
    public static void removeNode(Node node){ 
 | 
        cacheQueue.remove(node); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 缓存的节点数 
 | 
     * @Return 缓存节点数 
 | 
     */ 
 | 
    public static Integer size(){ 
 | 
        return cacheQueue.size() ; 
 | 
    } 
 | 
  
 | 
} 
 |