| package com.dy.rtuMw.server.mqtt; | 
|   | 
| import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; | 
| import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; | 
| import com.dy.common.queue.Node; | 
| import com.dy.common.queue.Queue; | 
| import com.dy.rtuMw.server.ServerProperties; | 
|   | 
| /** | 
|  * @Author: liurunyu | 
|  * @Date: 2025/6/4 17:24 | 
|  * @Description | 
|  */ | 
| public class MqttPubMsgCache { | 
|   | 
|     //TCP下行命令缓存队列 | 
|     private static Queue cacheQueue = new Queue("mqttPubMsgCache") ; | 
|   | 
|     private static MqttPubMsgCache instance = new MqttPubMsgCache() ; | 
|   | 
|     private MqttPubMsgCache(){ | 
|         cacheQueue.setLimit(ServerProperties.cacheUpDownDataWarnCount, ServerProperties.cacheUpDownDataMaxCount); | 
|     } | 
|   | 
|     public static MqttPubMsgCache getInstance(){ | 
|         return instance ; | 
|     } | 
|   | 
|   | 
|     public static Integer info(){ | 
|         Integer comTotalDown = 0 ;//缓存的下行命令总数 | 
|         MqttPubMsgNode obj ; | 
|         Node node = cacheQueue.getFirstNode() ; | 
|         while(node != null && node.obj != null){ | 
|             obj = (MqttPubMsgNode)node.obj; | 
|             if(!obj.onceReceivedResult){ | 
|                 comTotalDown ++ ; | 
|             } | 
|         } | 
|         return comTotalDown ; | 
|     } | 
|   | 
|     /** | 
|      * 缓存命令 | 
|      * @param result | 
|      * @throws Exception | 
|      */ | 
|     public static void cacheCommand(MqttPubMsg result) throws Exception{ | 
|         if(result != null){ | 
|             cacheQueue.pushHead(new MqttPubMsgNode(result)); | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * 匹配命令结果 | 
|      * @param subMsg | 
|      * @return | 
|      */ | 
|     public static MqttPubMsg matchFromHead(MqttSubMsg subMsg){ | 
|         MqttPubMsg pubMsg = null ; | 
|         MqttPubMsgNode obj = null ; | 
|         Node node = cacheQueue.getFirstNode() ; | 
|         while(node != null && node.obj != null){ | 
|             obj = (MqttPubMsgNode)node.obj; | 
|             pubMsg = obj.result ; | 
|             if(pubMsg != null | 
|                     && subMsg.subMsgMatchPubMsg(pubMsg)){ | 
|                 obj.onceReceivedResult = true ;//标识已经收到命令结果 | 
|                 return pubMsg; | 
|             }else{ | 
|                 node = node.next ; | 
|             } | 
|         } | 
|         return null ; | 
|     } | 
|   | 
|     /** | 
|      * 匹配命令结果 | 
|      * @param subMsg | 
|      * @return | 
|      */ | 
|     public static MqttPubMsg matchFromTail(MqttSubMsg subMsg){ | 
|         MqttPubMsg pubMsg = null ; | 
|         MqttPubMsgNode obj = null ; | 
|         Node node = cacheQueue.getLastNode() ; | 
|         while(node != null && node.obj != null){ | 
|             obj = (MqttPubMsgNode)node.obj; | 
|             pubMsg = obj.result ; | 
|             if(pubMsg != null && subMsg.subMsgMatchPubMsg(pubMsg)){ | 
|                 obj.onceReceivedResult = true ;//标识已经收到命令结果 | 
|                 return pubMsg; | 
|             }else{ | 
|                 node = node.pre ; | 
|             } | 
|         } | 
|         return null ; | 
|     } | 
|   | 
|     /** | 
|      * 得到第一个节点 | 
|      * @return | 
|      */ | 
|     public static Node getFirstQueueNode(){ | 
|         return cacheQueue.getFirstNode() ; | 
|     } | 
|   | 
|     /** | 
|      * 得到最后一个节点 | 
|      * @return | 
|      */ | 
|     public static Node getLastQueueNode(){ | 
|         return cacheQueue.getLastNode() ; | 
|     } | 
|   | 
|     /** | 
|      * 移除节点 | 
|      * @param node | 
|      */ | 
|     public static void removeNode(Node node){ | 
|         cacheQueue.remove(node); | 
|     } | 
|   | 
|     /** | 
|      * 缓存的节点数 | 
|      * @Return 缓存节点数 | 
|      */ | 
|     public static Integer size(){ | 
|         return cacheQueue.size() ; | 
|     } | 
|   | 
| } |