package com.dy.common.queue; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; /** * 队列,先进先出 */ public class Queue { @SuppressWarnings("unfinal") private final Logger log = LogManager.getLogger(Queue.class.getName()) ; private final Object synObj ; private String queueName ; private final Node head ; private final Node tail ; private int size ; //最大限制,超过这个限制,队列将拒绝push调用,并抛出异常 private int maxSize ; //警告限制,超过这个限制,队列接收push后,将记录warn日志 private int warnSize ;// public Queue(String queueName){ synObj = new Object() ; this.queueName = queueName ; if(this.queueName == null){ this.queueName = Queue.class.getName() ; } this.head = new Node() ; this.tail = new Node() ; this.head.next = this.tail ; this.tail.pre = this.head ; this.size = 0 ; this.warnSize = 1000000 ; this.maxSize = Integer.MAX_VALUE ; } public void setLimit(int warnSize, int maxSize){ this.warnSize = warnSize ; this.maxSize = maxSize ; if(this.warnSize <= 0){ this.warnSize = 1 ; } if(this.warnSize == Integer.MAX_VALUE){ this.warnSize = Integer.MAX_VALUE - 1000 ; } if(this.maxSize <= 0){ this.maxSize = 1 ; } } /** * 此方法在多线程中执行,同步方式,防止其他线程操作其他方法时不同步 * 在队列尾部压入一节点 * 在多线程中应用,加同步 * @param obj 入列的对象 */ @SuppressWarnings("unused") public void pushHead(NodeObj obj)throws Exception{ synchronized(synObj){ if(obj == null){ return ; } if(this.size >= this.maxSize){ throw new Exception("队列(" + queueName + ")缓存节点数" + this.size + " 超出最大限制(" + this.maxSize + "),拒绝接收新数据。"); } Node node = new Node(); node.obj = obj ; node.next = this.head.next ; node.pre = this.head ; this.head.next.pre = node ; this.head.next = node ; this.size ++ ; if(this.size >= this.warnSize){ log.warn("队列(" + queueName + ")缓存节点数 " + this.size + " 超限(" + this.warnSize + ")警告。"); } } } /** * 此方法在多线程中执行,同步方式,防止其他线程操作其他方法时不同步 * 在队列尾部压入一节点 * 在多线程中应用,加同步 * @param obj 入列的对象 */ public void pushTail(NodeObj obj)throws Exception{ synchronized(synObj){ if(obj == null){ return ; } if(this.size >= this.maxSize){ throw new Exception("队列(" + queueName + ")缓存节点数" + this.size + " 超出最大限制(" + this.maxSize + "),拒绝接收新数据。"); } Node node = new Node(); node.obj = obj ; node.next = this.tail ; node.pre = this.tail.pre ; this.tail.pre.next = node ; this.tail.pre = node ; this.size ++ ; if(this.size >= this.warnSize){ log.warn("队列(" + queueName + ")缓存节点数 " + this.size + " 超限(" + this.warnSize + ")警告。"); } } } /** * 此方法在单线程中执行,但可能受调用push的多线程影响,所以也用到了同步 * 弹出第一个节点 * 在多线程中应用,加同步 * @return 出列对象 */ public NodeObj pop(){ synchronized(synObj){ NodeObj obj = null ; if(this.size > 0){ Node node = this.head.next ; if(node != this.tail){ obj = node.obj ; this.head.next = this.head.next.next ; this.head.next.pre = this.head ; node.next = null ; node.pre = null ; this.size -- ; } } return obj ; } } /** * 得到第一个节点,但不把节点从队列中清除 * @return 第一个对象 */ @SuppressWarnings("unused") public Node getFirstNode(){ Node node = this.head.next ; if(node != this.tail){ return node ; } return null ; } /** * 得到最后一个节点,但不把节点从队列中清除 * @return 最后一个对象 */ @SuppressWarnings("unused") public Node getLastNode(){ Node node = this.tail.pre ; if(node != this.head){ return node ; } return null ; } /** * 清除一个节点 * @param node 删除一个节点 */ @SuppressWarnings("unused") public void remove(Node node){ synchronized(synObj){ if(node != null && node != this.head && node != this.tail){ node.pre.next = node.next ; node.next.pre = node.pre ; node.next = null ; node.pre = null ; this.size -- ; } } } /** * 得到队列中有效节点个数 * @return 队列内节点数量 */ public int size(){ return this.size ; } }