zhubaomin
2025-02-21 1f19230e00b543b58f03853df1a38ebd8b508e55
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
package com.dy.common.threadPool;
 
import java.util.*;
 
import org.apache.logging.log4j.*;
 
import com.dy.common.threadPool.ThreadPool.Job;
 
public class ThreadPoolImp {
    
    /**
     * 线程池实现
     * @author Administrator
     *
     */
    public class MyThreadPool implements ThreadPool.Pool{
        
        /**
         * 线程池唯一实例
         */
        public MyThreadPool myPool ;
 
        /**
         * 线程池名称;
         */
        private String poolName ;
        /**
         * 空闲线程集合
         */
        private List<MyThread> freeThreads;
 
        /**
         * 工作中的线程集合
         */
        private List<MyThread> busiThreads;
 
        /**
         * 线程池最大线程数 , 若maxNum <= 0,则maxNum = 1 。
         */
        private int maxNum;
 
        /**
         * 最小线程数,若minNum > maxNum,则minNum = maxNum 。
         */
        private int minNum;
 
        /**
         * 当前线程池中的线程数。
         */
        private int currNum;
        
        /**
         * 空闲线程超时的时长(秒),超过这个时间,就清除多余线程,
         * 使空闲线程最多是minNum个
         */
        private long freeTimeout ;
        
        /**
         * 忙碌线程超时的时长(秒),超过这个时间,就认为是崩溃线程,将被清除。
         */
        private long busyTimeout ;
 
        /**
         * 同步对象
         */
        private Object synObj = new Object();
 
        /**
         * 内部启动的空闲和忙碌超时线程的线程
         */
        private Thread monitorThread ;
 
        /**
         * 日志 
         */
        private static final Logger log = LogManager.getLogger(MonitorThread.class) ;
 
        /**
         * 线程池构造方法
         * @param poolName 线程池和线程名称
         * @param maxNum 线程池最大线程数,若为-1,不受限制
         * @param minNum 线程池最小线程数,或初始线程数
         * @param freeTimeout 空闲线程超时时长(秒)
         * @param busyTimeout 忙碌线程超时时长(秒),若为-1,不受限制
         */
        public MyThreadPool(
                String poolName , 
                Integer maxNum , 
                Integer minNum ,
                Long freeTimeout ,
                Long busyTimeout) {
            if(poolName == null){
                poolName="线程池" ;
            }
            this.poolName = poolName ;
            
            if(maxNum == null || maxNum.intValue() < 0){
                maxNum = 65535 ;
            }
            if(minNum == null || minNum.intValue() < 0){
                minNum = 0 ;
            }
            if(minNum > maxNum){
                minNum = maxNum ;
            }
            this.maxNum = maxNum ;
            this.minNum = minNum ;
            this.currNum = 0;
            
            if(freeTimeout == null || freeTimeout.longValue() <= 0){
                freeTimeout = 60000L ;
            }
            this.freeTimeout = freeTimeout ;
            
            if(busyTimeout == null || busyTimeout.longValue() <= 0){
                this.busyTimeout = -1L ;
            }else{
                this.busyTimeout = busyTimeout ;    
            }
            if(maxNum != 0){
                this.busiThreads = new ArrayList<>();
                this.freeThreads = new ArrayList<>();
                //最小化线程池
                for (int i = 0; i < this.minNum ; i++) {
                    MyThread t = new MyThread(this);
                    t.start();
                    this.freeThreads.add(t);
                    this.currNum++;
                }
                this.monitorThread = new MonitorThread(this) ;
                this.monitorThread.start() ;
            }
        }
        /**
         * 线程池中线程个数
         * @return
         */
        @Override
        public Integer size() {
            return currNum ;
        }
        @Override
        public Integer maxThread() {
            return maxNum ;
        }
        @Override
        public Integer minThread() {
            return minNum ;
        }
 
        /**
         * 把所要执行的工作对象实例放入线程池中
         * @param job ThreadJob 工作对象实例
         * @throws Exception 
         */
        @Override
        public void putJob(Job job) throws Exception {
            if(this.busiThreads == null || this.freeThreads == null){
                throw new Exception("线程池未启动") ;
            }
            synchronized(this.synObj) {
                //log.debug("工作任务分配到线程池中。") ;
                MyThread t = null ;
                if (this.freeThreads.size() == 0) {
                    //当前没有空闲线程
                    if (this.maxNum == -1 || this.currNum < this.maxNum) {
                        //当前线程数未达到最大值 , 增加新的线程
                        t = new MyThread(this);
                        t.start();
                        this.currNum++;
                    } else {
                        //当前线程达到最大数了,等待回归线程
                        while (freeThreads.size() == 0) {
                            //如果没有空闲的线程,等待工作线程工作完成回来
                            try {
                                //log.warn("线程池(" + this.poolName + ")中线程数达到上限,新工作任务等待释放线程!"); 
                                synObj.wait();
                            } catch (Exception e) {
                                log.error("'" + this.poolName + "'线程池中线程等待释放线时发生等待异常!", e);
                            }
                            t = (MyThread) freeThreads.get(0);
                            if (t != null) {
                                //说明得到了释放回来的线程
                                freeThreads.remove(0);
                                break;
                            }else{
                                //说明没有得到释放回来的线程,可以其他线程
                                continue;
                            }
                        }//end while
                    }//end else
                }//end if(freeThreads.size() == 0)
                else {
                    t = (MyThread) freeThreads.get(0);
                    freeThreads.remove(0);
                }
                busiThreads.add(t);
                t.putJob(job);
            }
        }
 
        /**
         * 线程工作完成,从busiThreads回归freeThreads
         */
        protected void freeThread(MyThread t) throws Exception {
            if(this.busiThreads == null || this.freeThreads == null){
                throw new Exception("线程池未启动") ;
            }
            synchronized (synObj) {
                busiThreads.remove(t);
                freeThreads.add(t);
                synObj.notify();
            }
        }
        
        /**
         * 监控超时线程的线程
         * @author Administrator
         *
         */
        protected class MonitorThread extends Thread {
            private MyThreadPool pool ;
            
            private MyThread t  ;
            private Iterator<?> it  ;
 
            /**
             * 
             * @param pool 池
             */
            public MonitorThread(MyThreadPool pool){
                this.pool = pool ;
            }
            /**
             * 
             */
            @SuppressWarnings("finally")
            public void run(){
                long interval = pool.freeTimeout ;
                if(pool.busyTimeout > 0 && pool.busyTimeout < pool.freeTimeout){
                    interval = pool.busyTimeout ;
                }
                boolean isException = false ;
                while(true){
                    t = null ;
                    it = null ;
                    try{
                        MonitorThread.sleep(interval) ;
                    }catch(Exception e){
                        isException = true ;
                    }finally{
                        if(isException){
                            isException = false ;
                            continue ;
                        }
                    }
                    try{
                        synchronized (pool.synObj) {
                            if(pool.freeThreads.size() > pool.minNum){
                                //如果空闲线程大于最小线程数,则清理空闲线程
                                int num = pool.freeThreads.size() - pool.minNum ;
                                int count = 0 ;
                                it = pool.freeThreads.iterator() ;
                                while(it.hasNext()){
                                    if(count == num) {
                                        break ;
                                    }
                                    count ++ ;
                                    t = (MyThread)it.next() ;
                                    if((System.currentTimeMillis() - t.time) >= pool.freeTimeout){
                                        it.remove() ;
                                        pool.currNum-- ;
                                        //log.debug("线程池(" + pool.poolName + ")中清除了一个超时空闲线程!"); 
                                        t.destroy() ;
                                        t = null ;
                                    }
                                }
                            }//end if
                            
                            if(pool.busyTimeout != -1 && pool.busyTimeout > 0){
                                it = pool.busiThreads.iterator() ;
                                while(it.hasNext()){
                                    t = (MyThread)it.next() ;
                                    if((System.currentTimeMillis() - t.time) >= pool.busyTimeout){
                                        it.remove() ;
                                        pool.currNum-- ;
                                        log.error("线程池(" + pool.poolName + ")中清除了一个超时崩溃(忙碌)线程!"); 
                                        t.destroy() ;
                                        t = null ;
                                    }
                                }
                            }
                        }//end synchronized (pool.synObj)
                    }catch(Exception e){
                        e.printStackTrace();
                    }finally{
                        continue ;
                    }
                }//end while(true)
            }
        }
        
        
    }
 
    /**
     * 池中线程实现
     * @author Administrator
     *
     */
    public class MyThread extends Thread{
 
        /**
         * 标示线程是活的
         */
        private boolean living = true ;
        
        /**
         * 线程忙碌或空闲记时器
         */
        protected long time ;
        
        /**
         * 当前线程所处的线程池
         */
        private MyThreadPool pool ;
        /**
         * 线程具体工作的回调类
         */
        private Job job ;
        
        /**
         * 指示线程可以工作
         */
        private Boolean canJob ;
        
        private Logger log = LogManager.getLogger(MyThread.class.getName()) ;
 
        protected MyThread(MyThreadPool pool) {
            super();
            this.pool = pool ;
            this.time = 0 ;
            this.canJob = false ;
        }
        
        /**
         * 设置线程工作对象
         * @param job 工作
         */
        protected void putJob(Job job) throws Exception {
            if(job == null){
                this.job = new Job(){
                    public void execute(){
                    }
                    public void destroy(){
                    }
                    public boolean isDestroy(){
                        return false ;
                    }
                };
            }
            synchronized (this) {
                this.job = job ;
                this.canJob = true;
                //忙碌记时开始
                this.time = System.currentTimeMillis() ;
                this.notify();
            }
        }
 
        /**
         * 
         */
        @SuppressWarnings("finally")
        @Override
        public void run(){
            while (living){
                synchronized (this){
                    while(!canJob){
                        //当不能工作时
                        try{
                            this.wait();
                        }catch (Exception e) {
                            log.error("线程池(" + pool.poolName + ")的工作线程等待可以工作的信号时发生等待异常:\n" + e.getMessage(),  e);
                            this.canJob = false ;
                            continue;
                        }
                    }
                    ///////////////////////
                    //被唤醒,有新工作了
                    try{
                        if(this.job != null){
                            this.job.execute() ;
                        }
                    }catch (Exception ee) {
                        log.error("线程池(" + pool.poolName + ")的工作线程在执行工作时发生异常:\n" + ee.getMessage(), ee);
                        //ee.printStackTrace() ;
                    }finally {
                        this.canJob = false ;
                        this.job = null ;
                        if(living){
                            //线程未被销毁
                            this.free() ;
                        }
                        continue;
                    }
                }// end synchronized(this)
            }// end while(living)
        }
        
        public void free(){
            try{
                //使本线程回归空闲线程池
                pool.freeThread(this);
                //空闲开始记时
                this.time = System.currentTimeMillis() ;
                // 没有可做的了
                this.canJob = false;
                log.debug("线程池(" + this.pool.poolName + ")中的线程回归空闲集合。");
            }catch (Exception e){
                log.error("线程池(" + pool.poolName + ")的工作线程释放回归时发生异常:\n" + e.getMessage(), e);
                e.printStackTrace();
            }
 
        }
 
        /**
         * 销毁工作线程
         */
        public void destroy() {
            //线程销毁前,首先把其所持有的工作销毁,否则线程停不下来,也销毁不了
            if(this.job != null){
                this.job.destroy();
            }
            this.living = false ;//使线程从run方法的长久while(living)中退出来,从线程自动销毁
            this.job = null ; 
        }
    }
}