zhubaomin
2025-04-11 d1e380d5bc8d6cda7dc26778dd638b3367483ae7
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/threadPool/ThreadPoolImp.java
New file
@@ -0,0 +1,439 @@
package com.dy.common.threadPool;
import java.util.*;
import org.apache.logging.log4j.*;
import com.dy.common.threadPool.ThreadPool.Job;
public class ThreadPoolImp {
   /**
    * 线程池实现
    * @author Administrator
    *
    */
   public class MyThreadPool implements ThreadPool.Pool{
      /**
       * 线程池唯一实例
       */
      public MyThreadPool myPool ;
      /**
       * 线程池名称;
       */
      private String poolName ;
      /**
       * 空闲线程集合
       */
      private List<MyThread> freeThreads;
      /**
       * 工作中的线程集合
       */
      private List<MyThread> busiThreads;
      /**
       * 线程池最大线程数 , 若maxNum <= 0,则maxNum = 1 。
       */
      private int maxNum;
      /**
       * 最小线程数,若minNum > maxNum,则minNum = maxNum 。
       */
      private int minNum;
      /**
       * 当前线程池中的线程数。
       */
      private int currNum;
      /**
       * 空闲线程超时的时长(秒),超过这个时间,就清除多余线程,
       * 使空闲线程最多是minNum个
       */
      private long freeTimeout ;
      /**
       * 忙碌线程超时的时长(秒),超过这个时间,就认为是崩溃线程,将被清除。
       */
      private long busyTimeout ;
      /**
       * 同步对象
       */
      private Object synObj = new Object();
      /**
       * 内部启动的空闲和忙碌超时线程的线程
       */
      private Thread monitorThread ;
      /**
       * 日志
       */
      private static final Logger log = LogManager.getLogger(MonitorThread.class) ;
      /**
       * 线程池构造方法
       * @param poolName 线程池和线程名称
       * @param maxNum 线程池最大线程数,若为-1,不受限制
       * @param minNum 线程池最小线程数,或初始线程数
       * @param freeTimeout 空闲线程超时时长(秒)
       * @param busyTimeout 忙碌线程超时时长(秒),若为-1,不受限制
       */
      public MyThreadPool(
            String poolName ,
            Integer maxNum ,
            Integer minNum ,
            Long freeTimeout ,
            Long busyTimeout) {
         if(poolName == null){
            poolName="线程池" ;
         }
         this.poolName = poolName ;
         if(maxNum == null || maxNum.intValue() < 0){
            maxNum = 65535 ;
         }
         if(minNum == null || minNum.intValue() < 0){
            minNum = 0 ;
         }
         if(minNum > maxNum){
            minNum = maxNum ;
         }
         this.maxNum = maxNum ;
         this.minNum = minNum ;
         this.currNum = 0;
         if(freeTimeout == null || freeTimeout.longValue() <= 0){
            freeTimeout = 60000L ;
         }
         this.freeTimeout = freeTimeout ;
         if(busyTimeout == null || busyTimeout.longValue() <= 0){
            this.busyTimeout = -1L ;
         }else{
            this.busyTimeout = busyTimeout ;
         }
         if(maxNum != 0){
            this.busiThreads = new ArrayList<>();
            this.freeThreads = new ArrayList<>();
            //最小化线程池
            for (int i = 0; i < this.minNum ; i++) {
               MyThread t = new MyThread(this);
               t.start();
               this.freeThreads.add(t);
               this.currNum++;
            }
            this.monitorThread = new MonitorThread(this) ;
            this.monitorThread.start() ;
         }
      }
      /**
       * 线程池中线程个数
       * @return
       */
      @Override
      public Integer size() {
         return currNum ;
      }
      @Override
      public Integer maxThread() {
         return maxNum ;
      }
      @Override
      public Integer minThread() {
         return minNum ;
      }
      /**
       * 把所要执行的工作对象实例放入线程池中
       * @param job ThreadJob 工作对象实例
       * @throws Exception
       */
      @Override
      public void putJob(Job job) throws Exception {
         if(this.busiThreads == null || this.freeThreads == null){
            throw new Exception("线程池未启动") ;
         }
         synchronized(this.synObj) {
            //log.debug("工作任务分配到线程池中。") ;
            MyThread t = null ;
            if (this.freeThreads.size() == 0) {
               //当前没有空闲线程
               if (this.maxNum == -1 || this.currNum < this.maxNum) {
                  //当前线程数未达到最大值 , 增加新的线程
                  t = new MyThread(this);
                  t.start();
                  this.currNum++;
               } else {
                  //当前线程达到最大数了,等待回归线程
                  while (freeThreads.size() == 0) {
                     //如果没有空闲的线程,等待工作线程工作完成回来
                     try {
                        //log.warn("线程池(" + this.poolName + ")中线程数达到上限,新工作任务等待释放线程!");
                        synObj.wait();
                     } catch (Exception e) {
                        log.error("'" + this.poolName + "'线程池中线程等待释放线时发生等待异常!", e);
                     }
                     t = (MyThread) freeThreads.get(0);
                     if (t != null) {
                        //说明得到了释放回来的线程
                        freeThreads.remove(0);
                        break;
                     }else{
                        //说明没有得到释放回来的线程,可以其他线程
                        continue;
                     }
                  }//end while
               }//end else
            }//end if(freeThreads.size() == 0)
            else {
               t = (MyThread) freeThreads.get(0);
               freeThreads.remove(0);
            }
            busiThreads.add(t);
            t.putJob(job);
         }
      }
      /**
       * 线程工作完成,从busiThreads回归freeThreads
       */
      protected void freeThread(MyThread t) throws Exception {
         if(this.busiThreads == null || this.freeThreads == null){
            throw new Exception("线程池未启动") ;
         }
         synchronized (synObj) {
            busiThreads.remove(t);
            freeThreads.add(t);
            synObj.notify();
         }
      }
      /**
       * 监控超时线程的线程
       * @author Administrator
       *
       */
      protected class MonitorThread extends Thread {
         private MyThreadPool pool ;
         private MyThread t  ;
         private Iterator<?> it  ;
         /**
          *
          * @param pool 池
          */
         public MonitorThread(MyThreadPool pool){
            this.pool = pool ;
         }
         /**
          *
          */
         @SuppressWarnings("finally")
         public void run(){
            long interval = pool.freeTimeout ;
            if(pool.busyTimeout > 0 && pool.busyTimeout < pool.freeTimeout){
               interval = pool.busyTimeout ;
            }
            boolean isException = false ;
            while(true){
               t = null ;
               it = null ;
               try{
                  MonitorThread.sleep(interval) ;
               }catch(Exception e){
                  isException = true ;
               }finally{
                  if(isException){
                     isException = false ;
                     continue ;
                  }
               }
               try{
                  synchronized (pool.synObj) {
                     if(pool.freeThreads.size() > pool.minNum){
                        //如果空闲线程大于最小线程数,则清理空闲线程
                        int num = pool.freeThreads.size() - pool.minNum ;
                        int count = 0 ;
                        it = pool.freeThreads.iterator() ;
                        while(it.hasNext()){
                           if(count == num) {
                              break ;
                           }
                           count ++ ;
                           t = (MyThread)it.next() ;
                           if((System.currentTimeMillis() - t.time) >= pool.freeTimeout){
                              it.remove() ;
                              pool.currNum-- ;
                              //log.debug("线程池(" + pool.poolName + ")中清除了一个超时空闲线程!");
                              t.destroy() ;
                              t = null ;
                           }
                        }
                     }//end if
                     if(pool.busyTimeout != -1 && pool.busyTimeout > 0){
                        it = pool.busiThreads.iterator() ;
                        while(it.hasNext()){
                           t = (MyThread)it.next() ;
                           if((System.currentTimeMillis() - t.time) >= pool.busyTimeout){
                              it.remove() ;
                              pool.currNum-- ;
                              log.error("线程池(" + pool.poolName + ")中清除了一个超时崩溃(忙碌)线程!");
                              t.destroy() ;
                              t = null ;
                           }
                        }
                     }
                  }//end synchronized (pool.synObj)
               }catch(Exception e){
                  e.printStackTrace();
               }finally{
                  continue ;
               }
            }//end while(true)
         }
      }
   }
   /**
    * 池中线程实现
    * @author Administrator
    *
    */
   public class MyThread extends Thread{
      /**
       * 标示线程是活的
       */
      private boolean living = true ;
      /**
       * 线程忙碌或空闲记时器
       */
      protected long time ;
      /**
       * 当前线程所处的线程池
       */
      private MyThreadPool pool ;
      /**
       * 线程具体工作的回调类
       */
      private Job job ;
      /**
       * 指示线程可以工作
       */
      private Boolean canJob ;
      private Logger log = LogManager.getLogger(MyThread.class.getName()) ;
      protected MyThread(MyThreadPool pool) {
         super();
         this.pool = pool ;
         this.time = 0 ;
         this.canJob = false ;
      }
      /**
       * 设置线程工作对象
       * @param job 工作
       */
      protected void putJob(Job job) throws Exception {
         if(job == null){
            this.job = new Job(){
               public void execute(){
               }
               public void destroy(){
               }
               public boolean isDestroy(){
                  return false ;
               }
            };
         }
         synchronized (this) {
            this.job = job ;
            this.canJob = true;
            //忙碌记时开始
            this.time = System.currentTimeMillis() ;
            this.notify();
         }
      }
      /**
       *
       */
      @SuppressWarnings("finally")
      @Override
      public void run(){
         while (living){
            synchronized (this){
               while(!canJob){
                  //当不能工作时
                  try{
                     this.wait();
                  }catch (Exception e) {
                     log.error("线程池(" + pool.poolName + ")的工作线程等待可以工作的信号时发生等待异常:\n" + e.getMessage(),  e);
                     this.canJob = false ;
                     continue;
                  }
               }
               ///////////////////////
               //被唤醒,有新工作了
               try{
                  if(this.job != null){
                     this.job.execute() ;
                  }
               }catch (Exception ee) {
                  log.error("线程池(" + pool.poolName + ")的工作线程在执行工作时发生异常:\n" + ee.getMessage(), ee);
                  //ee.printStackTrace() ;
               }finally {
                  this.canJob = false ;
                  this.job = null ;
                  if(living){
                     //线程未被销毁
                     this.free() ;
                  }
                  continue;
               }
            }// end synchronized(this)
         }// end while(living)
      }
      public void free(){
         try{
            //使本线程回归空闲线程池
            pool.freeThread(this);
            //空闲开始记时
            this.time = System.currentTimeMillis() ;
            // 没有可做的了
            this.canJob = false;
            log.debug("线程池(" + this.pool.poolName + ")中的线程回归空闲集合。");
         }catch (Exception e){
            log.error("线程池(" + pool.poolName + ")的工作线程释放回归时发生异常:\n" + e.getMessage(), e);
            e.printStackTrace();
         }
      }
      /**
       * 销毁工作线程
       */
      public void destroy() {
         //线程销毁前,首先把其所持有的工作销毁,否则线程停不下来,也销毁不了
         if(this.job != null){
            this.job.destroy();
         }
         this.living = false ;//使线程从run方法的长久while(living)中退出来,从线程自动销毁
         this.job = null ;
      }
   }
}