package com.dy.common.threadPool; import java.util.*; import org.apache.logging.log4j.*; import com.dy.common.threadPool.ThreadPool.Job; public class ThreadPoolImp { /** * 线程池实现 * @author Administrator * */ public class MyThreadPool implements ThreadPool.Pool{ /** * 线程池唯一实例 */ public MyThreadPool myPool ; /** * 线程池名称; */ private String poolName ; /** * 空闲线程集合 */ private List freeThreads; /** * 工作中的线程集合 */ private List busiThreads; /** * 线程池最大线程数 , 若maxNum <= 0,则maxNum = 1 。 */ private int maxNum; /** * 最小线程数,若minNum > maxNum,则minNum = maxNum 。 */ private int minNum; /** * 当前线程池中的线程数。 */ private int currNum; /** * 空闲线程超时的时长(秒),超过这个时间,就清除多余线程, * 使空闲线程最多是minNum个 */ private long freeTimeout ; /** * 忙碌线程超时的时长(秒),超过这个时间,就认为是崩溃线程,将被清除。 */ private long busyTimeout ; /** * 同步对象 */ private Object synObj = new Object(); /** * 内部启动的空闲和忙碌超时线程的线程 */ private Thread monitorThread ; /** * 日志 */ private static final Logger log = LogManager.getLogger(MonitorThread.class) ; /** * 线程池构造方法 * @param poolName 线程池和线程名称 * @param maxNum 线程池最大线程数,若为-1,不受限制 * @param minNum 线程池最小线程数,或初始线程数 * @param freeTimeout 空闲线程超时时长(秒) * @param busyTimeout 忙碌线程超时时长(秒),若为-1,不受限制 */ public MyThreadPool( String poolName , Integer maxNum , Integer minNum , Long freeTimeout , Long busyTimeout) { if(poolName == null){ poolName="线程池" ; } this.poolName = poolName ; if(maxNum == null || maxNum.intValue() < 0){ maxNum = 65535 ; } if(minNum == null || minNum.intValue() < 0){ minNum = 0 ; } if(minNum > maxNum){ minNum = maxNum ; } this.maxNum = maxNum ; this.minNum = minNum ; this.currNum = 0; if(freeTimeout == null || freeTimeout.longValue() <= 0){ freeTimeout = 60000L ; } this.freeTimeout = freeTimeout ; if(busyTimeout == null || busyTimeout.longValue() <= 0){ this.busyTimeout = -1L ; }else{ this.busyTimeout = busyTimeout ; } if(maxNum != 0){ this.busiThreads = new ArrayList<>(); this.freeThreads = new ArrayList<>(); //最小化线程池 for (int i = 0; i < this.minNum ; i++) { MyThread t = new MyThread(this); t.start(); this.freeThreads.add(t); this.currNum++; } this.monitorThread = new MonitorThread(this) ; this.monitorThread.start() ; } } /** * 线程池中线程个数 * @return */ @Override public Integer size() { return currNum ; } @Override public Integer maxThread() { return maxNum ; } @Override public Integer minThread() { return minNum ; } /** * 把所要执行的工作对象实例放入线程池中 * @param job ThreadJob 工作对象实例 * @throws Exception */ @Override public void putJob(Job job) throws Exception { if(this.busiThreads == null || this.freeThreads == null){ throw new Exception("线程池未启动") ; } synchronized(this.synObj) { //log.debug("工作任务分配到线程池中。") ; MyThread t = null ; if (this.freeThreads.size() == 0) { //当前没有空闲线程 if (this.maxNum == -1 || this.currNum < this.maxNum) { //当前线程数未达到最大值 , 增加新的线程 t = new MyThread(this); t.start(); this.currNum++; } else { //当前线程达到最大数了,等待回归线程 while (freeThreads.size() == 0) { //如果没有空闲的线程,等待工作线程工作完成回来 try { //log.warn("线程池(" + this.poolName + ")中线程数达到上限,新工作任务等待释放线程!"); synObj.wait(); } catch (Exception e) { log.error("'" + this.poolName + "'线程池中线程等待释放线时发生等待异常!", e); } t = (MyThread) freeThreads.get(0); if (t != null) { //说明得到了释放回来的线程 freeThreads.remove(0); break; }else{ //说明没有得到释放回来的线程,可以其他线程 continue; } }//end while }//end else }//end if(freeThreads.size() == 0) else { t = (MyThread) freeThreads.get(0); freeThreads.remove(0); } busiThreads.add(t); t.putJob(job); } } /** * 线程工作完成,从busiThreads回归freeThreads */ protected void freeThread(MyThread t) throws Exception { if(this.busiThreads == null || this.freeThreads == null){ throw new Exception("线程池未启动") ; } synchronized (synObj) { busiThreads.remove(t); freeThreads.add(t); synObj.notify(); } } /** * 监控超时线程的线程 * @author Administrator * */ protected class MonitorThread extends Thread { private MyThreadPool pool ; private MyThread t ; private Iterator it ; /** * * @param pool 池 */ public MonitorThread(MyThreadPool pool){ this.pool = pool ; } /** * */ @SuppressWarnings("finally") public void run(){ long interval = pool.freeTimeout ; if(pool.busyTimeout > 0 && pool.busyTimeout < pool.freeTimeout){ interval = pool.busyTimeout ; } boolean isException = false ; while(true){ t = null ; it = null ; try{ MonitorThread.sleep(interval) ; }catch(Exception e){ isException = true ; }finally{ if(isException){ isException = false ; continue ; } } try{ synchronized (pool.synObj) { if(pool.freeThreads.size() > pool.minNum){ //如果空闲线程大于最小线程数,则清理空闲线程 int num = pool.freeThreads.size() - pool.minNum ; int count = 0 ; it = pool.freeThreads.iterator() ; while(it.hasNext()){ if(count == num) { break ; } count ++ ; t = (MyThread)it.next() ; if((System.currentTimeMillis() - t.time) >= pool.freeTimeout){ it.remove() ; pool.currNum-- ; //log.debug("线程池(" + pool.poolName + ")中清除了一个超时空闲线程!"); t.destroy() ; t = null ; } } }//end if if(pool.busyTimeout != -1 && pool.busyTimeout > 0){ it = pool.busiThreads.iterator() ; while(it.hasNext()){ t = (MyThread)it.next() ; if((System.currentTimeMillis() - t.time) >= pool.busyTimeout){ it.remove() ; pool.currNum-- ; log.error("线程池(" + pool.poolName + ")中清除了一个超时崩溃(忙碌)线程!"); t.destroy() ; t = null ; } } } }//end synchronized (pool.synObj) }catch(Exception e){ e.printStackTrace(); }finally{ continue ; } }//end while(true) } } } /** * 池中线程实现 * @author Administrator * */ public class MyThread extends Thread{ /** * 标示线程是活的 */ private boolean living = true ; /** * 线程忙碌或空闲记时器 */ protected long time ; /** * 当前线程所处的线程池 */ private MyThreadPool pool ; /** * 线程具体工作的回调类 */ private Job job ; /** * 指示线程可以工作 */ private Boolean canJob ; private Logger log = LogManager.getLogger(MyThread.class.getName()) ; protected MyThread(MyThreadPool pool) { super(); this.pool = pool ; this.time = 0 ; this.canJob = false ; } /** * 设置线程工作对象 * @param job 工作 */ protected void putJob(Job job) throws Exception { if(job == null){ this.job = new Job(){ public void execute(){ } public void destroy(){ } public boolean isDestroy(){ return false ; } }; } synchronized (this) { this.job = job ; this.canJob = true; //忙碌记时开始 this.time = System.currentTimeMillis() ; this.notify(); } } /** * */ @SuppressWarnings("finally") @Override public void run(){ while (living){ synchronized (this){ while(!canJob){ //当不能工作时 try{ this.wait(); }catch (Exception e) { log.error("线程池(" + pool.poolName + ")的工作线程等待可以工作的信号时发生等待异常:\n" + e.getMessage(), e); this.canJob = false ; continue; } } /////////////////////// //被唤醒,有新工作了 try{ if(this.job != null){ this.job.execute() ; } }catch (Exception ee) { log.error("线程池(" + pool.poolName + ")的工作线程在执行工作时发生异常:\n" + ee.getMessage(), ee); //ee.printStackTrace() ; }finally { this.canJob = false ; this.job = null ; if(living){ //线程未被销毁 this.free() ; } continue; } }// end synchronized(this) }// end while(living) } public void free(){ try{ //使本线程回归空闲线程池 pool.freeThread(this); //空闲开始记时 this.time = System.currentTimeMillis() ; // 没有可做的了 this.canJob = false; log.debug("线程池(" + this.pool.poolName + ")中的线程回归空闲集合。"); }catch (Exception e){ log.error("线程池(" + pool.poolName + ")的工作线程释放回归时发生异常:\n" + e.getMessage(), e); e.printStackTrace(); } } /** * 销毁工作线程 */ public void destroy() { //线程销毁前,首先把其所持有的工作销毁,否则线程停不下来,也销毁不了 if(this.job != null){ this.job.destroy(); } this.living = false ;//使线程从run方法的长久while(living)中退出来,从线程自动销毁 this.job = null ; } } }