package com.dy.common.queue;
|
|
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.Logger;
|
|
/**
|
* 队列,先进先出
|
*/
|
public class Queue {
|
@SuppressWarnings("unfinal")
|
private final Logger log = LogManager.getLogger(Queue.class.getName()) ;
|
|
private final Object synObj ;
|
|
private String queueName ;
|
|
private final Node head ;
|
private final Node tail ;
|
private int size ;
|
|
//最大限制,超过这个限制,队列将拒绝push调用,并抛出异常
|
private int maxSize ;
|
//警告限制,超过这个限制,队列接收push后,将记录warn日志
|
private int warnSize ;//
|
|
|
public Queue(String queueName){
|
synObj = new Object() ;
|
this.queueName = queueName ;
|
if(this.queueName == null){
|
this.queueName = Queue.class.getName() ;
|
}
|
this.head = new Node() ;
|
this.tail = new Node() ;
|
this.head.next = this.tail ;
|
this.tail.pre = this.head ;
|
this.size = 0 ;
|
|
this.warnSize = 1000000 ;
|
this.maxSize = Integer.MAX_VALUE ;
|
}
|
|
public void setLimit(int warnSize, int maxSize){
|
this.warnSize = warnSize ;
|
this.maxSize = maxSize ;
|
if(this.warnSize <= 0){
|
this.warnSize = 1 ;
|
}
|
if(this.warnSize == Integer.MAX_VALUE){
|
this.warnSize = Integer.MAX_VALUE - 1000 ;
|
}
|
|
if(this.maxSize <= 0){
|
this.maxSize = 1 ;
|
}
|
}
|
/**
|
* 此方法在多线程中执行,同步方式,防止其他线程操作其他方法时不同步
|
* 在队列尾部压入一节点
|
* 在多线程中应用,加同步
|
* @param obj 入列的对象
|
*/
|
@SuppressWarnings("unused")
|
public void pushHead(NodeObj obj)throws Exception{
|
synchronized(synObj){
|
if(obj == null){
|
return ;
|
}
|
if(this.size >= this.maxSize){
|
throw new Exception("队列(" + queueName + ")缓存节点数" + this.size + " 超出最大限制(" + this.maxSize + "),拒绝接收新数据。");
|
}
|
Node node = new Node();
|
node.obj = obj ;
|
node.next = this.head.next ;
|
node.pre = this.head ;
|
this.head.next.pre = node ;
|
this.head.next = node ;
|
this.size ++ ;
|
if(this.size >= this.warnSize){
|
log.warn("队列(" + queueName + ")缓存节点数 " + this.size + " 超限(" + this.warnSize + ")警告。");
|
}
|
}
|
}
|
|
/**
|
* 此方法在多线程中执行,同步方式,防止其他线程操作其他方法时不同步
|
* 在队列尾部压入一节点
|
* 在多线程中应用,加同步
|
* @param obj 入列的对象
|
*/
|
public void pushTail(NodeObj obj)throws Exception{
|
synchronized(synObj){
|
if(obj == null){
|
return ;
|
}
|
if(this.size >= this.maxSize){
|
throw new Exception("队列(" + queueName + ")缓存节点数" + this.size + " 超出最大限制(" + this.maxSize + "),拒绝接收新数据。");
|
}
|
Node node = new Node();
|
node.obj = obj ;
|
node.next = this.tail ;
|
node.pre = this.tail.pre ;
|
this.tail.pre.next = node ;
|
this.tail.pre = node ;
|
this.size ++ ;
|
if(this.size >= this.warnSize){
|
log.warn("队列(" + queueName + ")缓存节点数 " + this.size + " 超限(" + this.warnSize + ")警告。");
|
}
|
}
|
}
|
|
/**
|
* 此方法在单线程中执行,但可能受调用push的多线程影响,所以也用到了同步
|
* 弹出第一个节点
|
* 在多线程中应用,加同步
|
* @return 出列对象
|
*/
|
public NodeObj pop(){
|
synchronized(synObj){
|
NodeObj obj = null ;
|
if(this.size > 0){
|
Node node = this.head.next ;
|
if(node != this.tail){
|
obj = node.obj ;
|
this.head.next = this.head.next.next ;
|
this.head.next.pre = this.head ;
|
node.next = null ;
|
node.pre = null ;
|
this.size -- ;
|
}
|
}
|
return obj ;
|
}
|
}
|
|
/**
|
* 得到第一个节点,但不把节点从队列中清除
|
* @return 第一个对象
|
*/
|
@SuppressWarnings("unused")
|
public Node getFirstNode(){
|
Node node = this.head.next ;
|
if(node != this.tail){
|
return node ;
|
}
|
return null ;
|
}
|
/**
|
* 得到最后一个节点,但不把节点从队列中清除
|
* @return 最后一个对象
|
*/
|
@SuppressWarnings("unused")
|
public Node getLastNode(){
|
Node node = this.tail.pre ;
|
if(node != this.head){
|
return node ;
|
}
|
return null ;
|
}
|
|
/**
|
* 清除一个节点
|
* @param node 删除一个节点
|
*/
|
@SuppressWarnings("unused")
|
public void remove(Node node){
|
synchronized(synObj){
|
if(node != null && node != this.head && node != this.tail){
|
node.pre.next = node.next ;
|
node.next.pre = node.pre ;
|
node.next = null ;
|
node.pre = null ;
|
this.size -- ;
|
}
|
}
|
}
|
|
/**
|
* 得到队列中有效节点个数
|
* @return 队列内节点数量
|
*/
|
public int size(){
|
return this.size ;
|
}
|
|
}
|