package com.dy.common.threadPool;
|
|
import java.util.*;
|
|
import org.apache.logging.log4j.*;
|
|
import com.dy.common.threadPool.ThreadPool.Job;
|
|
public class ThreadPoolImp {
|
|
/**
|
* 线程池实现
|
* @author Administrator
|
*
|
*/
|
public class MyThreadPool implements ThreadPool.Pool{
|
|
/**
|
* 线程池唯一实例
|
*/
|
public MyThreadPool myPool ;
|
|
/**
|
* 线程池名称;
|
*/
|
private String poolName ;
|
/**
|
* 空闲线程集合
|
*/
|
private List<MyThread> freeThreads;
|
|
/**
|
* 工作中的线程集合
|
*/
|
private List<MyThread> busiThreads;
|
|
/**
|
* 线程池最大线程数 , 若maxNum <= 0,则maxNum = 1 。
|
*/
|
private int maxNum;
|
|
/**
|
* 最小线程数,若minNum > maxNum,则minNum = maxNum 。
|
*/
|
private int minNum;
|
|
/**
|
* 当前线程池中的线程数。
|
*/
|
private int currNum;
|
|
/**
|
* 空闲线程超时的时长(秒),超过这个时间,就清除多余线程,
|
* 使空闲线程最多是minNum个
|
*/
|
private long freeTimeout ;
|
|
/**
|
* 忙碌线程超时的时长(秒),超过这个时间,就认为是崩溃线程,将被清除。
|
*/
|
private long busyTimeout ;
|
|
/**
|
* 同步对象
|
*/
|
private Object synObj = new Object();
|
|
/**
|
* 内部启动的空闲和忙碌超时线程的线程
|
*/
|
private Thread monitorThread ;
|
|
/**
|
* 日志
|
*/
|
private Logger log = LogManager.getLogger(MonitorThread.class.getName());
|
|
/**
|
* 得到默认唯一实例
|
* @return
|
*/
|
// public MyThreadPool getDefaultInstance(){
|
// if(myPool == null){
|
// myPool = new MyThreadPool(null, null, null, null, null) ;
|
// }
|
// return myPool ;
|
// }
|
/**
|
* 得到唯一实例
|
* @param poolName
|
* @param maxNum
|
* @param minNum
|
* @param freeTimeout
|
* @param busyTimeout
|
* @return
|
*/
|
// public MyThreadPool getInstance(
|
// String poolName ,
|
// Integer maxNum ,
|
// Integer minNum ,
|
// Long freeTimeout ,
|
// Long busyTimeout){
|
// if(myPool == null){
|
// myPool = new MyThreadPool(poolName, maxNum, minNum, freeTimeout, busyTimeout) ;
|
// }
|
// return myPool ;
|
// }
|
|
/**
|
* 线程池构造方法
|
* @param poolName 线程池和线程名称
|
* @param maxNum 线程池最大线程数,若为-1,不受限制
|
* @param minNum 线程池最小线程数,或初始线程数
|
* @param freeTimeout 空闲线程超时时长(秒)
|
* @param busyTimeout 忙碌线程超时时长(秒),若为-1,不受限制
|
*/
|
public MyThreadPool(
|
String poolName ,
|
Integer maxNum ,
|
Integer minNum ,
|
Long freeTimeout ,
|
Long busyTimeout) {
|
if(poolName == null){
|
poolName="线程池" ;
|
}
|
this.poolName = poolName ;
|
|
if(maxNum == null || maxNum.intValue() <= 0){
|
maxNum = -1 ;
|
}
|
if(minNum == null || minNum.intValue() < 0){
|
minNum = 0 ;
|
}
|
if(minNum > maxNum){
|
minNum = maxNum ;
|
}
|
this.maxNum = maxNum ;
|
this.minNum = minNum ;
|
this.currNum = 0;
|
|
if(freeTimeout == null || freeTimeout.longValue() <= 0){
|
freeTimeout = 60000L ;
|
}
|
this.freeTimeout = freeTimeout ;
|
|
if(busyTimeout == null || busyTimeout.longValue() <= 0){
|
this.busyTimeout = -1L ;
|
}else{
|
this.busyTimeout = busyTimeout ;
|
}
|
|
this.busiThreads = new ArrayList<MyThread>();
|
this.freeThreads = new ArrayList<MyThread>();
|
|
//最小化线程池
|
for (int i = 0; i < this.minNum ; i++) {
|
MyThread t = new MyThread(this);
|
t.start();
|
this.freeThreads.add(t);
|
this.currNum++;
|
}
|
|
this.monitorThread = new MonitorThread(this) ;
|
this.monitorThread.start() ;
|
}
|
|
/**
|
* 把所要执行的工作对象实例放入线程池中
|
* @param job ThreadJob 工作对象实例
|
* @throws Exception
|
*/
|
@Override
|
public void putJob(Job job) throws Exception {
|
synchronized(this.synObj) {
|
//log.debug("工作任务分配到线程池中。") ;
|
MyThread t = null ;
|
if (this.freeThreads.size() == 0) {
|
//当前没有空闲线程
|
if (this.maxNum == -1 || this.currNum < this.maxNum) {
|
//当前线程数未达到最大值 , 增加新的线程
|
t = new MyThread(this);
|
t.start();
|
this.currNum++;
|
} else {
|
//当前线程达到最大数了,等待回归线程
|
while (freeThreads.size() == 0) {
|
//如果没有空闲的线程,等待工作线程工作完成回来
|
try {
|
//log.warn("线程池(" + this.poolName + ")中线程数达到上限,新工作任务等待释放线程!");
|
synObj.wait();
|
} catch (Exception e) {
|
log.error("'" + this.poolName + "'线程池中线程等待释放线时发生等待异常!", e);
|
}
|
t = (MyThread) freeThreads.get(0);
|
if (t != null) {
|
//说明得到了释放回来的线程
|
freeThreads.remove(0);
|
break;
|
}else{
|
//说明没有得到释放回来的线程,可以其他线程
|
continue;
|
}
|
}//end while
|
}//end else
|
}//end if(freeThreads.size() == 0)
|
else {
|
t = (MyThread) freeThreads.get(0);
|
freeThreads.remove(0);
|
}
|
busiThreads.add(t);
|
t.putJob(job);
|
}
|
}
|
|
/**
|
* 线程工作完成,从busiThreads回归freeThreads
|
*/
|
protected void freeThread(MyThread t) {
|
synchronized (synObj) {
|
busiThreads.remove(t);
|
freeThreads.add(t);
|
synObj.notify();
|
}
|
}
|
|
/**
|
* 监控超时线程的线程
|
* @author Administrator
|
*
|
*/
|
protected class MonitorThread extends Thread {
|
private MyThreadPool pool ;
|
|
private MyThread t ;
|
private Iterator<?> it ;
|
|
/**
|
*
|
* @param pool
|
*/
|
public MonitorThread(MyThreadPool pool){
|
this.pool = pool ;
|
}
|
/**
|
*
|
*/
|
@SuppressWarnings("finally")
|
public void run(){
|
long interval = pool.freeTimeout ;
|
if(pool.busyTimeout > 0 && pool.busyTimeout < pool.freeTimeout){
|
interval = pool.busyTimeout ;
|
}
|
boolean isException = false ;
|
while(true){
|
t = null ;
|
it = null ;
|
try{
|
MonitorThread.sleep(interval) ;
|
}catch(Exception e){
|
isException = true ;
|
}finally{
|
if(isException){
|
isException = false ;
|
continue ;
|
}
|
}
|
try{
|
synchronized (pool.synObj) {
|
if(pool.freeThreads.size() > pool.minNum){
|
//如果空闲线程大于最小线程数,则清理空闲线程
|
int num = pool.freeThreads.size() - pool.minNum ;
|
int count = 0 ;
|
it = pool.freeThreads.iterator() ;
|
while(it.hasNext()){
|
if(count == num) {
|
break ;
|
}
|
count ++ ;
|
t = (MyThread)it.next() ;
|
if((System.currentTimeMillis() - t.time) >= pool.freeTimeout){
|
it.remove() ;
|
pool.currNum-- ;
|
//log.debug("线程池(" + pool.poolName + ")中清除了一个超时空闲线程!");
|
t.destroy() ;
|
t = null ;
|
}
|
}
|
}//end if
|
|
if(pool.busyTimeout != -1 && pool.busyTimeout > 0){
|
it = pool.busiThreads.iterator() ;
|
while(it.hasNext()){
|
t = (MyThread)it.next() ;
|
if((System.currentTimeMillis() - t.time) >= pool.busyTimeout){
|
it.remove() ;
|
pool.currNum-- ;
|
log.error("线程池(" + pool.poolName + ")中清除了一个超时崩溃(忙碌)线程!");
|
t.destroy() ;
|
t = null ;
|
}
|
}
|
}
|
}//end synchronized (pool.synObj)
|
}catch(Exception e){
|
}finally{
|
continue ;
|
}
|
}//end while(true)
|
}
|
}
|
|
|
}
|
|
/**
|
* 池中线程实现
|
* @author Administrator
|
*
|
*/
|
public class MyThread extends Thread{
|
|
/**
|
* 标示线程是活的
|
*/
|
private boolean living = true ;
|
|
/**
|
* 线程忙碌或空闲记时器
|
*/
|
protected long time ;
|
|
/**
|
* 当前线程所处的线程池
|
*/
|
private MyThreadPool pool ;
|
/**
|
* 线程具体工作的回调类
|
*/
|
private Job job ;
|
|
/**
|
* 指示线程可以工作
|
*/
|
private Boolean canJob ;
|
|
private Logger log = LogManager.getLogger(MyThread.class.getName()) ;
|
|
protected MyThread(MyThreadPool pool) {
|
super();
|
this.pool = pool ;
|
this.time = 0 ;
|
this.canJob = false ;
|
}
|
|
/**
|
* 设置线程工作对象
|
* @param job
|
*/
|
protected void putJob(Job job) throws Exception {
|
if(job == null){
|
this.job = new Job(){
|
public void execute(){
|
}
|
public void destroy(){
|
}
|
public boolean isDestroy(){
|
return false ;
|
}
|
};
|
}
|
synchronized (this) {
|
this.job = job ;
|
this.canJob = true;
|
//忙碌记时开始
|
this.time = System.currentTimeMillis() ;
|
this.notify();
|
}
|
}
|
|
/**
|
*
|
*/
|
@SuppressWarnings("finally")
|
@Override
|
public void run(){
|
while (living){
|
synchronized (this){
|
while(!canJob){
|
//当不能工作时
|
try{
|
this.wait();
|
}catch (Exception e) {
|
log.error("线程池(" + pool.poolName + ")的工作线程等待可以工作的信号时发生等待异常:\n" + e.getMessage(), e);
|
this.canJob = false ;
|
continue;
|
}
|
}
|
///////////////////////
|
//被唤醒,有新工作了
|
try{
|
if(this.job != null){
|
this.job.execute() ;
|
}
|
}catch (Exception ee) {
|
log.error("线程池(" + pool.poolName + ")的工作线程在执行工作时发生异常:\n" + ee.getMessage(), ee);
|
//ee.printStackTrace() ;
|
}finally {
|
this.canJob = false ;
|
this.job = null ;
|
if(living){
|
//线程未被销毁
|
this.free() ;
|
}
|
continue;
|
}
|
}// end synchronized(this)
|
}// end while(living)
|
}
|
|
public void free(){
|
//使本线程回归空闲线程池
|
pool.freeThread(this);
|
//空闲开始记时
|
this.time = System.currentTimeMillis() ;
|
// 没有可做的了
|
this.canJob = false;
|
log.debug("线程池(" + this.pool.poolName + ")中的线程回归空闲集合。");
|
}
|
|
/**
|
* 销毁工作线程
|
*/
|
public void destroy() {
|
//线程销毁前,首先把其所持有的工作销毁,否则线程停不下来,也销毁不了
|
if(this.job != null){
|
this.job.destroy();
|
}
|
this.living = false ;//使线程从run方法的长久while(living)中退出来,从线程自动销毁
|
this.job = null ;
|
}
|
}
|
}
|