package com.dy.common.threadPool; 
 | 
  
 | 
import java.util.*; 
 | 
  
 | 
import org.apache.logging.log4j.*; 
 | 
  
 | 
import com.dy.common.threadPool.ThreadPool.Job; 
 | 
  
 | 
public class ThreadPoolImp { 
 | 
     
 | 
    /** 
 | 
     * 线程池实现 
 | 
     * @author Administrator 
 | 
     * 
 | 
     */ 
 | 
    public class MyThreadPool implements ThreadPool.Pool{ 
 | 
         
 | 
        /** 
 | 
         * 线程池唯一实例 
 | 
         */ 
 | 
        public MyThreadPool myPool ; 
 | 
  
 | 
        /** 
 | 
         * 线程池名称; 
 | 
         */ 
 | 
        private String poolName ; 
 | 
        /** 
 | 
         * 空闲线程集合 
 | 
         */ 
 | 
        private List<MyThread> freeThreads; 
 | 
  
 | 
        /** 
 | 
         * 工作中的线程集合 
 | 
         */ 
 | 
        private List<MyThread> busiThreads; 
 | 
  
 | 
        /** 
 | 
         * 线程池最大线程数 , 若maxNum <= 0,则maxNum = 1 。 
 | 
         */ 
 | 
        private int maxNum; 
 | 
  
 | 
        /** 
 | 
         * 最小线程数,若minNum > maxNum,则minNum = maxNum 。 
 | 
         */ 
 | 
        private int minNum; 
 | 
  
 | 
        /** 
 | 
         * 当前线程池中的线程数。 
 | 
         */ 
 | 
        private int currNum; 
 | 
         
 | 
        /** 
 | 
         * 空闲线程超时的时长(秒),超过这个时间,就清除多余线程, 
 | 
         * 使空闲线程最多是minNum个 
 | 
         */ 
 | 
        private long freeTimeout ; 
 | 
         
 | 
        /** 
 | 
         * 忙碌线程超时的时长(秒),超过这个时间,就认为是崩溃线程,将被清除。 
 | 
         */ 
 | 
        private long busyTimeout ; 
 | 
  
 | 
        /** 
 | 
         * 同步对象 
 | 
         */ 
 | 
        private Object synObj = new Object(); 
 | 
  
 | 
        /** 
 | 
         * 内部启动的空闲和忙碌超时线程的线程 
 | 
         */ 
 | 
        private Thread monitorThread ; 
 | 
  
 | 
        /** 
 | 
         * 日志  
 | 
         */ 
 | 
        private static final Logger log = LogManager.getLogger(MonitorThread.class) ; 
 | 
  
 | 
        /** 
 | 
         * 线程池构造方法 
 | 
         * @param poolName 线程池和线程名称 
 | 
         * @param maxNum 线程池最大线程数,若为-1,不受限制 
 | 
         * @param minNum 线程池最小线程数,或初始线程数 
 | 
         * @param freeTimeout 空闲线程超时时长(秒) 
 | 
         * @param busyTimeout 忙碌线程超时时长(秒),若为-1,不受限制 
 | 
         */ 
 | 
        public MyThreadPool( 
 | 
                String poolName ,  
 | 
                Integer maxNum ,  
 | 
                Integer minNum , 
 | 
                Long freeTimeout , 
 | 
                Long busyTimeout) { 
 | 
            if(poolName == null){ 
 | 
                poolName="线程池" ; 
 | 
            } 
 | 
            this.poolName = poolName ; 
 | 
             
 | 
            if(maxNum == null || maxNum.intValue() < 0){ 
 | 
                maxNum = 65535 ; 
 | 
            } 
 | 
            if(minNum == null || minNum.intValue() < 0){ 
 | 
                minNum = 0 ; 
 | 
            } 
 | 
            if(minNum > maxNum){ 
 | 
                minNum = maxNum ; 
 | 
            } 
 | 
            this.maxNum = maxNum ; 
 | 
            this.minNum = minNum ; 
 | 
            this.currNum = 0; 
 | 
             
 | 
            if(freeTimeout == null || freeTimeout.longValue() <= 0){ 
 | 
                freeTimeout = 60000L ; 
 | 
            } 
 | 
            this.freeTimeout = freeTimeout ; 
 | 
             
 | 
            if(busyTimeout == null || busyTimeout.longValue() <= 0){ 
 | 
                this.busyTimeout = -1L ; 
 | 
            }else{ 
 | 
                this.busyTimeout = busyTimeout ;     
 | 
            } 
 | 
            if(maxNum != 0){ 
 | 
                this.busiThreads = new ArrayList<>(); 
 | 
                this.freeThreads = new ArrayList<>(); 
 | 
                //最小化线程池 
 | 
                for (int i = 0; i < this.minNum ; i++) { 
 | 
                    MyThread t = new MyThread(this); 
 | 
                    t.start(); 
 | 
                    this.freeThreads.add(t); 
 | 
                    this.currNum++; 
 | 
                } 
 | 
                this.monitorThread = new MonitorThread(this) ; 
 | 
                this.monitorThread.start() ; 
 | 
            } 
 | 
        } 
 | 
  
 | 
        /** 
 | 
         * 把所要执行的工作对象实例放入线程池中 
 | 
         * @param job ThreadJob 工作对象实例 
 | 
         * @throws Exception  
 | 
         */ 
 | 
        @Override 
 | 
        public void putJob(Job job) throws Exception { 
 | 
            if(this.busiThreads == null || this.freeThreads == null){ 
 | 
                throw new Exception("线程池未启动") ; 
 | 
            } 
 | 
            synchronized(this.synObj) { 
 | 
                //log.debug("工作任务分配到线程池中。") ; 
 | 
                MyThread t = null ; 
 | 
                if (this.freeThreads.size() == 0) { 
 | 
                    //当前没有空闲线程 
 | 
                    if (this.maxNum == -1 || this.currNum < this.maxNum) { 
 | 
                        //当前线程数未达到最大值 , 增加新的线程 
 | 
                        t = new MyThread(this); 
 | 
                        t.start(); 
 | 
                        this.currNum++; 
 | 
                    } else { 
 | 
                        //当前线程达到最大数了,等待回归线程 
 | 
                        while (freeThreads.size() == 0) { 
 | 
                            //如果没有空闲的线程,等待工作线程工作完成回来 
 | 
                            try { 
 | 
                                //log.warn("线程池(" + this.poolName + ")中线程数达到上限,新工作任务等待释放线程!");  
 | 
                                synObj.wait(); 
 | 
                            } catch (Exception e) { 
 | 
                                log.error("'" + this.poolName + "'线程池中线程等待释放线时发生等待异常!", e); 
 | 
                            } 
 | 
                            t = (MyThread) freeThreads.get(0); 
 | 
                            if (t != null) { 
 | 
                                //说明得到了释放回来的线程 
 | 
                                freeThreads.remove(0); 
 | 
                                break; 
 | 
                            }else{ 
 | 
                                //说明没有得到释放回来的线程,可以其他线程 
 | 
                                continue; 
 | 
                            } 
 | 
                        }//end while 
 | 
                    }//end else 
 | 
                }//end if(freeThreads.size() == 0) 
 | 
                else { 
 | 
                    t = (MyThread) freeThreads.get(0); 
 | 
                    freeThreads.remove(0); 
 | 
                } 
 | 
                busiThreads.add(t); 
 | 
                t.putJob(job); 
 | 
            } 
 | 
        } 
 | 
  
 | 
        /** 
 | 
         * 线程工作完成,从busiThreads回归freeThreads 
 | 
         */ 
 | 
        protected void freeThread(MyThread t) throws Exception { 
 | 
            if(this.busiThreads == null || this.freeThreads == null){ 
 | 
                throw new Exception("线程池未启动") ; 
 | 
            } 
 | 
            synchronized (synObj) { 
 | 
                busiThreads.remove(t); 
 | 
                freeThreads.add(t); 
 | 
                synObj.notify(); 
 | 
            } 
 | 
        } 
 | 
         
 | 
        /** 
 | 
         * 监控超时线程的线程 
 | 
         * @author Administrator 
 | 
         * 
 | 
         */ 
 | 
        protected class MonitorThread extends Thread { 
 | 
            private MyThreadPool pool ; 
 | 
             
 | 
            private MyThread t  ; 
 | 
            private Iterator<?> it  ; 
 | 
  
 | 
            /** 
 | 
             *  
 | 
             * @param pool 池 
 | 
             */ 
 | 
            public MonitorThread(MyThreadPool pool){ 
 | 
                this.pool = pool ; 
 | 
            } 
 | 
            /** 
 | 
             *  
 | 
             */ 
 | 
            @SuppressWarnings("finally") 
 | 
            public void run(){ 
 | 
                long interval = pool.freeTimeout ; 
 | 
                if(pool.busyTimeout > 0 && pool.busyTimeout < pool.freeTimeout){ 
 | 
                    interval = pool.busyTimeout ; 
 | 
                } 
 | 
                boolean isException = false ; 
 | 
                while(true){ 
 | 
                    t = null ; 
 | 
                    it = null ; 
 | 
                    try{ 
 | 
                        MonitorThread.sleep(interval) ; 
 | 
                    }catch(Exception e){ 
 | 
                        isException = true ; 
 | 
                    }finally{ 
 | 
                        if(isException){ 
 | 
                            isException = false ; 
 | 
                            continue ; 
 | 
                        } 
 | 
                    } 
 | 
                    try{ 
 | 
                        synchronized (pool.synObj) { 
 | 
                            if(pool.freeThreads.size() > pool.minNum){ 
 | 
                                //如果空闲线程大于最小线程数,则清理空闲线程 
 | 
                                int num = pool.freeThreads.size() - pool.minNum ; 
 | 
                                int count = 0 ; 
 | 
                                it = pool.freeThreads.iterator() ; 
 | 
                                while(it.hasNext()){ 
 | 
                                    if(count == num) { 
 | 
                                        break ; 
 | 
                                    } 
 | 
                                    count ++ ; 
 | 
                                    t = (MyThread)it.next() ; 
 | 
                                    if((System.currentTimeMillis() - t.time) >= pool.freeTimeout){ 
 | 
                                        it.remove() ; 
 | 
                                        pool.currNum-- ; 
 | 
                                        //log.debug("线程池(" + pool.poolName + ")中清除了一个超时空闲线程!");  
 | 
                                        t.destroy() ; 
 | 
                                        t = null ; 
 | 
                                    } 
 | 
                                } 
 | 
                            }//end if 
 | 
                             
 | 
                            if(pool.busyTimeout != -1 && pool.busyTimeout > 0){ 
 | 
                                it = pool.busiThreads.iterator() ; 
 | 
                                while(it.hasNext()){ 
 | 
                                    t = (MyThread)it.next() ; 
 | 
                                    if((System.currentTimeMillis() - t.time) >= pool.busyTimeout){ 
 | 
                                        it.remove() ; 
 | 
                                        pool.currNum-- ; 
 | 
                                        log.error("线程池(" + pool.poolName + ")中清除了一个超时崩溃(忙碌)线程!");  
 | 
                                        t.destroy() ; 
 | 
                                        t = null ; 
 | 
                                    } 
 | 
                                } 
 | 
                            } 
 | 
                        }//end synchronized (pool.synObj) 
 | 
                    }catch(Exception e){ 
 | 
                        e.printStackTrace(); 
 | 
                    }finally{ 
 | 
                        continue ; 
 | 
                    } 
 | 
                }//end while(true) 
 | 
            } 
 | 
        } 
 | 
         
 | 
         
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 池中线程实现 
 | 
     * @author Administrator 
 | 
     * 
 | 
     */ 
 | 
    public class MyThread extends Thread{ 
 | 
  
 | 
        /** 
 | 
         * 标示线程是活的 
 | 
         */ 
 | 
        private boolean living = true ; 
 | 
         
 | 
        /** 
 | 
         * 线程忙碌或空闲记时器 
 | 
         */ 
 | 
        protected long time ; 
 | 
         
 | 
        /** 
 | 
         * 当前线程所处的线程池 
 | 
         */ 
 | 
        private MyThreadPool pool ; 
 | 
        /** 
 | 
         * 线程具体工作的回调类 
 | 
         */ 
 | 
        private Job job ; 
 | 
         
 | 
        /** 
 | 
         * 指示线程可以工作 
 | 
         */ 
 | 
        private Boolean canJob ; 
 | 
         
 | 
        private Logger log = LogManager.getLogger(MyThread.class.getName()) ; 
 | 
  
 | 
        protected MyThread(MyThreadPool pool) { 
 | 
            super(); 
 | 
            this.pool = pool ; 
 | 
            this.time = 0 ; 
 | 
            this.canJob = false ; 
 | 
        } 
 | 
         
 | 
        /** 
 | 
         * 设置线程工作对象 
 | 
         * @param job 工作 
 | 
         */ 
 | 
        protected void putJob(Job job) throws Exception { 
 | 
            if(job == null){ 
 | 
                this.job = new Job(){ 
 | 
                    public void execute(){ 
 | 
                    } 
 | 
                    public void destroy(){ 
 | 
                    } 
 | 
                    public boolean isDestroy(){ 
 | 
                        return false ; 
 | 
                    } 
 | 
                }; 
 | 
            } 
 | 
            synchronized (this) { 
 | 
                this.job = job ; 
 | 
                this.canJob = true; 
 | 
                //忙碌记时开始 
 | 
                this.time = System.currentTimeMillis() ; 
 | 
                this.notify(); 
 | 
            } 
 | 
        } 
 | 
  
 | 
        /** 
 | 
         *  
 | 
         */ 
 | 
        @SuppressWarnings("finally") 
 | 
        @Override 
 | 
        public void run(){ 
 | 
            while (living){ 
 | 
                synchronized (this){ 
 | 
                    while(!canJob){ 
 | 
                        //当不能工作时 
 | 
                        try{ 
 | 
                            this.wait(); 
 | 
                        }catch (Exception e) { 
 | 
                            log.error("线程池(" + pool.poolName + ")的工作线程等待可以工作的信号时发生等待异常:\n" + e.getMessage(),  e); 
 | 
                            this.canJob = false ; 
 | 
                            continue; 
 | 
                        } 
 | 
                    } 
 | 
                    /////////////////////// 
 | 
                    //被唤醒,有新工作了 
 | 
                    try{ 
 | 
                        if(this.job != null){ 
 | 
                            this.job.execute() ; 
 | 
                        } 
 | 
                    }catch (Exception ee) { 
 | 
                        log.error("线程池(" + pool.poolName + ")的工作线程在执行工作时发生异常:\n" + ee.getMessage(), ee); 
 | 
                        //ee.printStackTrace() ; 
 | 
                    }finally { 
 | 
                        this.canJob = false ; 
 | 
                        this.job = null ; 
 | 
                        if(living){ 
 | 
                            //线程未被销毁 
 | 
                            this.free() ; 
 | 
                        } 
 | 
                        continue; 
 | 
                    } 
 | 
                }// end synchronized(this) 
 | 
            }// end while(living) 
 | 
        } 
 | 
         
 | 
        public void free(){ 
 | 
            try{ 
 | 
                //使本线程回归空闲线程池 
 | 
                pool.freeThread(this); 
 | 
                //空闲开始记时 
 | 
                this.time = System.currentTimeMillis() ; 
 | 
                // 没有可做的了 
 | 
                this.canJob = false; 
 | 
                log.debug("线程池(" + this.pool.poolName + ")中的线程回归空闲集合。"); 
 | 
            }catch (Exception e){ 
 | 
                log.error("线程池(" + pool.poolName + ")的工作线程释放回归时发生异常:\n" + e.getMessage(), e); 
 | 
                e.printStackTrace(); 
 | 
            } 
 | 
  
 | 
        } 
 | 
  
 | 
        /** 
 | 
         * 销毁工作线程 
 | 
         */ 
 | 
        public void destroy() { 
 | 
            //线程销毁前,首先把其所持有的工作销毁,否则线程停不下来,也销毁不了 
 | 
            if(this.job != null){ 
 | 
                this.job.destroy(); 
 | 
            } 
 | 
            this.living = false ;//使线程从run方法的长久while(living)中退出来,从线程自动销毁 
 | 
            this.job = null ;  
 | 
        } 
 | 
    } 
 | 
} 
 |