liurunyu
3 天以前 ca2417166fe917aeb45d08df6ff152f9c7a0060a
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/queue/Queue.java
New file
@@ -0,0 +1,186 @@
package com.dy.common.queue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
 * 队列,先进先出
 */
public class Queue {
   @SuppressWarnings("unfinal")
   private final Logger log = LogManager.getLogger(Queue.class.getName()) ;
   private final Object synObj ;
   private String queueName ;
   private final Node head ;
   private final Node tail ;
   private int size ;
   //最大限制,超过这个限制,队列将拒绝push调用,并抛出异常
   private int maxSize ;
   //警告限制,超过这个限制,队列接收push后,将记录warn日志
   private int warnSize ;//
   public Queue(String queueName){
      synObj = new Object() ;
      this.queueName = queueName ;
      if(this.queueName == null){
         this.queueName = Queue.class.getName() ;
      }
      this.head = new Node() ;
      this.tail = new Node() ;
      this.head.next = this.tail ;
      this.tail.pre = this.head ;
      this.size = 0 ;
      this.warnSize = 1000000 ;
      this.maxSize = Integer.MAX_VALUE ;
   }
   public void setLimit(int warnSize, int maxSize){
      this.warnSize = warnSize ;
      this.maxSize = maxSize ;
      if(this.warnSize <= 0){
         this.warnSize = 1 ;
      }
      if(this.warnSize == Integer.MAX_VALUE){
         this.warnSize = Integer.MAX_VALUE - 1000 ;
      }
      if(this.maxSize <= 0){
         this.maxSize = 1 ;
      }
   }
   /**
    * 此方法在多线程中执行,同步方式,防止其他线程操作其他方法时不同步
    * 在队列尾部压入一节点
    * 在多线程中应用,加同步
    * @param obj 入列的对象
    */
   @SuppressWarnings("unused")
   public void pushHead(NodeObj obj)throws Exception{
      synchronized(synObj){
         if(obj == null){
            return ;
         }
         if(this.size >= this.maxSize){
            throw new Exception("队列(" + queueName + ")缓存节点数" + this.size + " 超出最大限制(" + this.maxSize + "),拒绝接收新数据。");
         }
         Node node = new Node();
         node.obj = obj ;
         node.next = this.head.next ;
         node.pre = this.head ;
         this.head.next.pre = node ;
         this.head.next = node ;
         this.size ++ ;
         if(this.size >= this.warnSize){
            log.warn("队列(" + queueName + ")缓存节点数 " + this.size + " 超限(" + this.warnSize + ")警告。");
         }
      }
   }
   /**
    * 此方法在多线程中执行,同步方式,防止其他线程操作其他方法时不同步
    * 在队列尾部压入一节点
    * 在多线程中应用,加同步
    * @param obj 入列的对象
    */
   public void pushTail(NodeObj obj)throws Exception{
      synchronized(synObj){
         if(obj == null){
            return ;
         }
         if(this.size >= this.maxSize){
            throw new Exception("队列(" + queueName + ")缓存节点数" + this.size + " 超出最大限制(" + this.maxSize + "),拒绝接收新数据。");
         }
         Node node = new Node();
         node.obj = obj ;
         node.next = this.tail ;
         node.pre = this.tail.pre ;
         this.tail.pre.next = node ;
         this.tail.pre = node ;
         this.size ++ ;
         if(this.size >= this.warnSize){
            log.warn("队列(" + queueName + ")缓存节点数 " + this.size + " 超限(" + this.warnSize + ")警告。");
         }
      }
   }
   /**
    * 此方法在单线程中执行,但可能受调用push的多线程影响,所以也用到了同步
    * 弹出第一个节点
    * 在多线程中应用,加同步
    * @return 出列对象
    */
   public NodeObj pop(){
      synchronized(synObj){
         NodeObj obj = null ;
         if(this.size > 0){
            Node node = this.head.next ;
            if(node != this.tail){
               obj = node.obj ;
               this.head.next = this.head.next.next ;
               this.head.next.pre = this.head ;
               node.next = null ;
               node.pre = null ;
               this.size -- ;
            }
         }
         return obj ;
      }
   }
   /**
    * 得到第一个节点,但不把节点从队列中清除
    * @return 第一个对象
    */
   @SuppressWarnings("unused")
   public Node getFirstNode(){
      Node node = this.head.next ;
      if(node != this.tail){
         return node ;
      }
      return null ;
   }
   /**
    * 得到最后一个节点,但不把节点从队列中清除
    * @return 最后一个对象
    */
   @SuppressWarnings("unused")
   public Node getLastNode(){
      Node node = this.tail.pre ;
      if(node != this.head){
         return node ;
      }
      return null ;
   }
   /**
    * 清除一个节点
    * @param node 删除一个节点
    */
   @SuppressWarnings("unused")
   public void remove(Node node){
      synchronized(synObj){
         if(node != null && node != this.head && node != this.tail){
            node.pre.next = node.next ;
            node.next.pre = node.pre ;
            node.next = null ;
            node.pre = null ;
            this.size -- ;
         }
      }
   }
   /**
    * 得到队列中有效节点个数
    * @return 队列内节点数量
    */
   public int size(){
      return this.size ;
   }
}