pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/queue/Node.java
New file @@ -0,0 +1,8 @@ package com.dy.common.queue; public class Node { public Node pre ; public Node next ; public NodeObj obj ; } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/queue/NodeObj.java
New file @@ -0,0 +1,4 @@ package com.dy.common.queue; public interface NodeObj { } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/queue/Queue.java
New file @@ -0,0 +1,186 @@ package com.dy.common.queue; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; /** * 队列,先进先出 */ public class Queue { @SuppressWarnings("unfinal") private final Logger log = LogManager.getLogger(Queue.class.getName()) ; private final Object synObj ; private String queueName ; private final Node head ; private final Node tail ; private int size ; //最大限制,超过这个限制,队列将拒绝push调用,并抛出异常 private int maxSize ; //警告限制,超过这个限制,队列接收push后,将记录warn日志 private int warnSize ;// public Queue(String queueName){ synObj = new Object() ; this.queueName = queueName ; if(this.queueName == null){ this.queueName = Queue.class.getName() ; } this.head = new Node() ; this.tail = new Node() ; this.head.next = this.tail ; this.tail.pre = this.head ; this.size = 0 ; this.warnSize = 1000000 ; this.maxSize = Integer.MAX_VALUE ; } public void setLimit(int warnSize, int maxSize){ this.warnSize = warnSize ; this.maxSize = maxSize ; if(this.warnSize <= 0){ this.warnSize = 1 ; } if(this.warnSize == Integer.MAX_VALUE){ this.warnSize = Integer.MAX_VALUE - 1000 ; } if(this.maxSize <= 0){ this.maxSize = 1 ; } } /** * 此方法在多线程中执行,同步方式,防止其他线程操作其他方法时不同步 * 在队列尾部压入一节点 * 在多线程中应用,加同步 * @param obj 入列的对象 */ @SuppressWarnings("unused") public void pushHead(NodeObj obj)throws Exception{ synchronized(synObj){ if(obj == null){ return ; } if(this.size >= this.maxSize){ throw new Exception("队列(" + queueName + ")缓存节点数" + this.size + " 超出最大限制(" + this.maxSize + "),拒绝接收新数据。"); } Node node = new Node(); node.obj = obj ; node.next = this.head.next ; node.pre = this.head ; this.head.next.pre = node ; this.head.next = node ; this.size ++ ; if(this.size >= this.warnSize){ log.warn("队列(" + queueName + ")缓存节点数 " + this.size + " 超限(" + this.warnSize + ")警告。"); } } } /** * 此方法在多线程中执行,同步方式,防止其他线程操作其他方法时不同步 * 在队列尾部压入一节点 * 在多线程中应用,加同步 * @param obj 入列的对象 */ public void pushTail(NodeObj obj)throws Exception{ synchronized(synObj){ if(obj == null){ return ; } if(this.size >= this.maxSize){ throw new Exception("队列(" + queueName + ")缓存节点数" + this.size + " 超出最大限制(" + this.maxSize + "),拒绝接收新数据。"); } Node node = new Node(); node.obj = obj ; node.next = this.tail ; node.pre = this.tail.pre ; this.tail.pre.next = node ; this.tail.pre = node ; this.size ++ ; if(this.size >= this.warnSize){ log.warn("队列(" + queueName + ")缓存节点数 " + this.size + " 超限(" + this.warnSize + ")警告。"); } } } /** * 此方法在单线程中执行,但可能受调用push的多线程影响,所以也用到了同步 * 弹出第一个节点 * 在多线程中应用,加同步 * @return 出列对象 */ public NodeObj pop(){ synchronized(synObj){ NodeObj obj = null ; if(this.size > 0){ Node node = this.head.next ; if(node != this.tail){ obj = node.obj ; this.head.next = this.head.next.next ; this.head.next.pre = this.head ; node.next = null ; node.pre = null ; this.size -- ; } } return obj ; } } /** * 得到第一个节点,但不把节点从队列中清除 * @return 第一个对象 */ @SuppressWarnings("unused") public Node getFirstNode(){ Node node = this.head.next ; if(node != this.tail){ return node ; } return null ; } /** * 得到最后一个节点,但不把节点从队列中清除 * @return 最后一个对象 */ @SuppressWarnings("unused") public Node getLastNode(){ Node node = this.tail.pre ; if(node != this.head){ return node ; } return null ; } /** * 清除一个节点 * @param node 删除一个节点 */ @SuppressWarnings("unused") public void remove(Node node){ synchronized(synObj){ if(node != null && node != this.head && node != this.tail){ node.pre.next = node.next ; node.next.pre = node.pre ; node.next = null ; node.pre = null ; this.size -- ; } } } /** * 得到队列中有效节点个数 * @return 队列内节点数量 */ public int size(){ return this.size ; } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/schedulerTask/SchedulerTaskFactory.java
New file @@ -0,0 +1,85 @@ package com.dy.common.schedulerTask; import java.util.Properties; import org.apache.logging.log4j.*; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.impl.StdSchedulerFactory; public class SchedulerTaskFactory { private static Logger log = LogManager.getLogger(SchedulerTaskFactory.class.getName()) ; private static Scheduler scheduler ; private static Properties pro ; static{ pro = new Properties() ; //以下属性从quartz-all-2.1.7.jar得到,而且下面所有项目都要配置 pro.put("org.quartz.scheduler.instanceName", "DefaultQuartzScheduler"); pro.put("org.quartz.scheduler.rmi.export", "false"); pro.put("org.quartz.scheduler.rmi.proxy", "false"); pro.put("org.quartz.scheduler.wrapJobExecutionInUserTransaction", "false"); pro.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool"); pro.put("org.quartz.threadPool.threadCount", "10"); pro.put("org.quartz.threadPool.threadPriority", "5"); pro.put("org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread", "true"); pro.put("org.quartz.jobStore.misfireThreshold", "60000"); pro.put("org.quartz.jobStore.class", "org.quartz.simpl.RAMJobStore"); } /** * 得到调度器唯一实例 * @return */ public static Scheduler getSingleScheduler(){ if(scheduler == null){ try { scheduler = new StdSchedulerFactory(pro).getScheduler(); } catch (SchedulerException e) { log.error(e) ; } } return scheduler ; } /** * 得到调度器唯一实例 * @param threadPoolMaxCount * @param threadPoolPriority * @return */ public static Scheduler getSingleScheduler(Integer threadPoolMaxCount, Integer threadPoolPriority){ if(scheduler == null){ try { if(threadPoolMaxCount != null && threadPoolMaxCount.intValue() >= 0){ pro.put("org.quartz.threadPool.threadCount", "" + (threadPoolMaxCount==null?10:(threadPoolMaxCount<=0?10:threadPoolMaxCount))); } if(threadPoolPriority != null && threadPoolPriority.intValue() >= 0){ pro.put("org.quartz.threadPool.threadPriority", "" + (threadPoolPriority==null?5:(threadPoolPriority<=0?5:threadPoolPriority))); } if(threadPoolMaxCount != null && threadPoolPriority != null){ if(threadPoolMaxCount.intValue() < threadPoolPriority.intValue()){ throw new SchedulerException("threadPoolMaxCount必须大于等于threadPoolPriority") ; } } scheduler = new StdSchedulerFactory(pro).getScheduler(); } catch (SchedulerException e) { log.error(e) ; } } return scheduler ; } /** * 关闭调度器 * @throws SchedulerException */ public static void shutdownScheduler() throws SchedulerException{ scheduler.shutdown() ; } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/schedulerTask/SchedulerTaskSupport.java
New file @@ -0,0 +1,464 @@ package com.dy.common.schedulerTask; import java.util.*; import org.quartz.*; public class SchedulerTaskSupport { private static Integer threadPoolMaxCount; private static Integer threadPoolPriority; public static void setThreadPoolPro(Integer threadPoolMaxCount, Integer threadPoolPriority){ if(SchedulerTaskSupport.threadPoolMaxCount == null){ SchedulerTaskSupport.threadPoolMaxCount = threadPoolMaxCount; } if(SchedulerTaskSupport.threadPoolPriority == null){ SchedulerTaskSupport.threadPoolPriority = threadPoolPriority; } } /** * 添加每X秒钟重复工作一次的工作任务, * @param jobName 工作名称 * @param jobGroupName 工作组名称 * @param jobClass 工作类 * @param futureSecond 延迟到将来时长(单位秒)以开始工作 * @param intervalInSeconds 重复工作间隔时长(秒) * @param repeatCount 重复工作次数(如果小于0,将一直重复执行下去 , 如果是0执行一次,如果是1执行2次,依次类推) * @return 工作任务Key * @throws Exception */ @SuppressWarnings({"unchecked","rawtypes"}) public static String addSecondlyJob( String jobName, String jobGroupName, Class jobClass , HashMap<String , Object> jobDataMap , int futureSecond, int intervalInSeconds , int repeatCount) throws Exception { SchedulerTaskSupport.checkDelayStart(futureSecond) ; if(intervalInSeconds < 1){ throw new Exception("重复工作间隔时长(单位秒)不能小于1秒)!") ; } Scheduler sched = SchedulerTaskFactory.getSingleScheduler(threadPoolMaxCount, threadPoolPriority) ; Date futureDate = DateBuilder.futureDate(futureSecond, DateBuilder.IntervalUnit.SECOND) ; Date startTime = DateBuilder.evenSecondDate(futureDate); JobDetail job = JobBuilder.newJob(jobClass) .withIdentity(jobName, jobGroupName) .build(); SchedulerTaskSupport.setDataMap(job, jobDataMap) ; Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(jobName + "_trigger", jobGroupName + "_trigger") .startAt(startTime) .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(intervalInSeconds) .withRepeatCount(repeatCount)) .build(); sched.scheduleJob(job, trigger); sched.start(); return job.getKey().toString() ; } /** * 添加每X分钟重复工作一次的工作任务, * @param jobName 工作名称 * @param jobGroupName 工作组名称 * @param jobClass 工作类 * @param futureSecond 延迟到将来时长(单位秒)以开始工作 * @param intervalInMinutes 重复工作间隔时长(分钟) * @param repeatCount 重复工作次数(如果小于0,将一直重复执行下去 , 如果是0执行一次,如果是1执行2次,依次类推) * @throws Exception */ @SuppressWarnings({"unchecked","rawtypes"}) public static String addMinutelyJob( String jobName, String jobGroupName, Class jobClass , HashMap<String , Object> jobDataMap , int futureSecond, int intervalInMinutes , int repeatCount) throws Exception { SchedulerTaskSupport.checkDelayStart(futureSecond) ; if(intervalInMinutes < 1){ throw new Exception("重复工作间隔时长(单位分钟)不能小于1分钟)!") ; } Scheduler sched = SchedulerTaskFactory.getSingleScheduler(threadPoolMaxCount, threadPoolPriority) ; Date futureDate = DateBuilder.futureDate(futureSecond, DateBuilder.IntervalUnit.SECOND) ; Date startTime = DateBuilder.evenSecondDate(futureDate); JobDetail job = JobBuilder.newJob(jobClass) .withIdentity(jobName, jobGroupName) .build(); SchedulerTaskSupport.setDataMap(job, jobDataMap) ; Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(jobName + "_trigger", jobGroupName + "_trigger") .startAt(startTime) .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInMinutes(intervalInMinutes) .withRepeatCount(repeatCount)) .build(); sched.scheduleJob(job, trigger); sched.start(); return job.getKey().toString() ; } /** * 添加每X小时重复工作一次的工作任务, * @param jobName 工作名称 * @param jobGroupName 工作组名称 * @param jobClass 工作类 * @param futureSecond 延迟到将来时长(单位秒)以开始工作 * @param intervalInHour 重复工作间隔时长(小时) * @param repeatCount 重复工作次数(如果小于0,将一直重复执行下去 , 如果是0执行一次,如果是1执行2次,依次类推) * @throws Exception */ @SuppressWarnings({"unchecked","rawtypes"}) public static String addHourlyJob( String jobName, String jobGroupName, Class jobClass , HashMap<String , Object> jobDataMap , int futureSecond, int intervalInHour , int repeatCount) throws Exception { SchedulerTaskSupport.checkDelayStart(futureSecond) ; if(intervalInHour < 1){ throw new Exception("重复工作间隔时长(单位小时)不能小于1小时)!") ; } Scheduler sched = SchedulerTaskFactory.getSingleScheduler(threadPoolMaxCount, threadPoolPriority) ; Date futureDate = DateBuilder.futureDate(futureSecond, DateBuilder.IntervalUnit.SECOND) ; Date startTime = DateBuilder.evenSecondDate(futureDate); JobDetail job = JobBuilder.newJob(jobClass) .withIdentity(jobName, jobGroupName) .build(); SchedulerTaskSupport.setDataMap(job, jobDataMap) ; Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(jobName + "_trigger", jobGroupName + "_trigger") .startAt(startTime) .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInHours(intervalInHour) .withRepeatCount(repeatCount)) .build(); sched.scheduleJob(job, trigger); sched.start(); return job.getKey().toString() ; } /** * 添加每天某时某分重复工作一次的工作任务, * @param jobName 工作名称 * @param jobGroupName 工作组名称 * @param jobClass 工作类 * @param hour 某时(0-23之间(包括0与23)) * @param minute 某分(0-59之间(包括0与59)) * @throws Exception */ // 表达式 意义 // "0 0 12 * * ?" 每天中午12点触发 // "0 15 10 ? * *" 每天上午10:15触发 // "0 15 10 * * ?" 每天上午10:15触发 // "0 15 10 * * ? *" 每天上午10:15触发 // "0 15 10 * * ? 2005" 2005年的每天上午10:15触发 // "0 * 14 * * ?" 在每天下午2点到下午2:59期间的每1分钟触发 // "0 0/5 14 * * ?" 在每天下午2点到下午2:55期间的每5分钟触发 // "0 0/5 14,18 * * ?" 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发 // "0 0-5 14 * * ?" 在每天下午2点到下午2:05期间的每1分钟触发 // "0 10,44 14 ? 3 WED" 每年三月的星期三的下午2:10和2:44触发 // "0 15 10 ? * MON-FRI" 周一至周五的上午10:15触发 // "0 15 10 15 * ?" 每月15日上午10:15触发 // "0 15 10 L * ?" 每月最后一日的上午10:15触发 // "0 15 10 ? * 6L" 每月的最后一个星期五上午10:15触发 // "0 15 10 ? * 6L 2002-2005" 2002年至2005年的每月的最后一个星期五上午10:15触发 // "0 15 10 ? * 6#3" 每月的第三个星期五上午10:15触发 @SuppressWarnings({"unchecked","rawtypes"}) public static String addDailyJob( String jobName, String jobGroupName, Class jobClass , HashMap<String , Object> jobDataMap , int hour , int minute) throws Exception { SchedulerTaskSupport.checkHour(hour) ; SchedulerTaskSupport.checkMinute(minute) ; Scheduler sched = SchedulerTaskFactory.getSingleScheduler(threadPoolMaxCount, threadPoolPriority) ; JobDetail job = JobBuilder.newJob(jobClass) .withIdentity(jobName, jobGroupName) .build(); SchedulerTaskSupport.setDataMap(job, jobDataMap) ; Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(jobName + "_trigger", jobGroupName + "_trigger") .withSchedule(CronScheduleBuilder.cronSchedule("0 " + minute + " " + hour + " * * ? *")) .build(); sched.scheduleJob(job, trigger); sched.start(); return job.getKey().toString() ; } /** * 添加每周一、周二、周三、周四、周五各重复工作一次的工作任务, * @param jobName 工作名称 * @param jobGroupName 工作组名称 * @param jobClass 工作类 * @param hour 某时(0-23之间(包括0与23)) * @param minute 某分(0-59之间(包括0与59)) * @throws Exception */ @SuppressWarnings({"unchecked","rawtypes"}) public static String addWorkingDayInWeekJob( String jobName, String jobGroupName, Class jobClass , HashMap<String , Object> jobDataMap , int hour , int minute) throws Exception { SchedulerTaskSupport.checkHour(hour) ; SchedulerTaskSupport.checkMinute(minute) ; Scheduler sched = SchedulerTaskFactory.getSingleScheduler(threadPoolMaxCount, threadPoolPriority) ; JobDetail job = JobBuilder.newJob(jobClass) .withIdentity(jobName, jobGroupName) .build(); SchedulerTaskSupport.setDataMap(job, jobDataMap) ; Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(jobName + "_trigger", jobGroupName + "_trigger") .withSchedule(CronScheduleBuilder.cronSchedule("0 " + minute + " " + hour + " ? * MON-FRI" )) .build(); sched.scheduleJob(job, trigger); sched.start(); return job.getKey().toString() ; } /** * 添加每一周的某一天某时某分重复工作一次的工作任务, * @param jobName 工作名称 * @param jobGroupName 工作组名称 * @param jobClass 工作类 * @param dayOfWeek 一周的某一天(1代表周一,依次类推)(取值1-7(包括1与7)) * @param hour 某时(0-23之间(包括0与23)) * @param minute 某分(0-59之间(包括0与59)) * @throws Exception */ @SuppressWarnings({"unchecked","rawtypes"}) public static String addWeeklyJob( String jobName, String jobGroupName, Class jobClass , HashMap<String , Object> jobDataMap , int dayOfWeek, int hour , int minute) throws Exception { SchedulerTaskSupport.checkHour(hour) ; SchedulerTaskSupport.checkMinute(minute) ; dayOfWeek = SchedulerTaskSupport.checkDayOfWeek(dayOfWeek); Scheduler sched = SchedulerTaskFactory.getSingleScheduler(threadPoolMaxCount, threadPoolPriority) ; JobDetail job = JobBuilder.newJob(jobClass) .withIdentity(jobName, jobGroupName) .build(); SchedulerTaskSupport.setDataMap(job, jobDataMap) ; Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(jobName + "_trigger", jobGroupName + "_trigger") .withSchedule(CronScheduleBuilder.cronSchedule("0 " + minute + " " + hour + " ? * " + dayOfWeek )) .build(); sched.scheduleJob(job, trigger); sched.start(); return job.getKey().toString() ; } /** * 添加每月的某一天某时某分重复工作一次的工作任务, * @param jobName 工作名称 * @param jobGroupName 工作组名称 * @param jobClass 工作类 * @param dayOfMonth 一月的某一天(1-31之间(包括1与31)) * @param hour 某时(0-23之间(包括0与23)) * @param minute 某分(0-59之间(包括0与59)) * @throws Exception */ @SuppressWarnings({"unchecked","rawtypes"}) public static String addMonthlyJob( String jobName, String jobGroupName, Class jobClass , HashMap<String , Object> jobDataMap , int dayOfMonth, int hour , int minute) throws Exception { SchedulerTaskSupport.checkHour(hour) ; SchedulerTaskSupport.checkMinute(minute) ; SchedulerTaskSupport.checkDayOfMonth(dayOfMonth); Scheduler sched = SchedulerTaskFactory.getSingleScheduler(threadPoolMaxCount, threadPoolPriority) ; JobDetail job = JobBuilder.newJob(jobClass) .withIdentity(jobName, jobGroupName) .build(); SchedulerTaskSupport.setDataMap(job, jobDataMap) ; Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(jobName + "_trigger", jobGroupName + "_trigger") .withSchedule(CronScheduleBuilder.cronSchedule("0 " + minute + " " + hour + " " + dayOfMonth + " * ?")) .build(); sched.scheduleJob(job, trigger); sched.start(); return job.getKey().toString() ; } /** * 添加每月的最后一天某时某分重复工作一次的工作任务, * @param jobName 工作名称 * @param jobGroupName 工作组名称 * @param jobClass 工作类 * @param hour 某时(0-23之间(包括0与23)) * @param minute 某分(0-59之间(包括0与59)) * @throws Exception */ @SuppressWarnings({"unchecked","rawtypes"}) public static String addLastDateOfMonthJob( String jobName, String jobGroupName, Class jobClass , HashMap<String , Object> jobDataMap , int hour , int minute) throws Exception { SchedulerTaskSupport.checkHour(hour) ; SchedulerTaskSupport.checkMinute(minute) ; Scheduler sched = SchedulerTaskFactory.getSingleScheduler(threadPoolMaxCount, threadPoolPriority) ; JobDetail job = JobBuilder.newJob(jobClass) .withIdentity(jobName, jobGroupName) .build(); SchedulerTaskSupport.setDataMap(job, jobDataMap) ; Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(jobName + "_trigger", jobGroupName + "_trigger") .withSchedule(CronScheduleBuilder.cronSchedule("0 " + minute + " " + hour + " L * ?")) .build(); sched.scheduleJob(job, trigger); sched.start(); return job.getKey().toString() ; } /** * 设置参数数据 */ private static void setDataMap(JobDetail job , HashMap<String , Object> jobDataMap){ if(jobDataMap != null && jobDataMap.size() > 0){ JobDataMap map = job.getJobDataMap(); Set<Map.Entry<String , Object>> set = jobDataMap.entrySet() ; Iterator<Map.Entry<String , Object>> it = set.iterator() ; Map.Entry<String , Object> entry = null ; String key = null ; Object value = null ; while(it.hasNext()){ entry = it.next() ; key = entry.getKey() ; value = entry.getValue() ; map.put(key, value); } } } /** * * @param futureSecond */ private static void checkDelayStart(int futureSecond)throws Exception { if(futureSecond < 0){ throw new Exception("延迟开始工作时长(单位秒)不能小于0!") ; } } /** * * @param hour * @throws Exception */ private static void checkHour(int hour)throws Exception { if(hour < 0 || hour >= 24){ throw new Exception("重复工作的小时时间点取值必须在0-23之间(包括0与23)!") ; } } /** * * @param minute * @throws Exception */ private static void checkMinute(int minute)throws Exception { if(minute < 0 || minute >= 60){ throw new Exception("重复工作的分钟时间点取值必须在0-59之间(包括0与59)!") ; } } /** * * @param dayOfWeek * @throws Exception */ private static int checkDayOfWeek(int dayOfWeek)throws Exception { if(dayOfWeek < 1 || dayOfWeek > 7){ throw new Exception("重复工作的一星期的某一天取值必须在1(周一)-7(周日)间(包括1与7)!") ; } int d = 0 ; if(dayOfWeek == 1){ d = DateBuilder.MONDAY ; }else if(dayOfWeek == 2){ d = DateBuilder.TUESDAY ; }else if(dayOfWeek == 3){ d = DateBuilder.WEDNESDAY ; }else if(dayOfWeek == 4){ d = DateBuilder.THURSDAY ; }else if(dayOfWeek == 5){ d = DateBuilder.FRIDAY ; }else if(dayOfWeek == 6){ d = DateBuilder.SATURDAY ; }else if(dayOfWeek == 7){ d = DateBuilder.SUNDAY ; } return d ; } /** * * @param dayOfMonth * @throws Exception */ private static void checkDayOfMonth(int dayOfMonth)throws Exception { if(dayOfMonth < 1 || dayOfMonth > 31){ throw new Exception("重复工作一月某一天取值必须在1-31之间(包括1与31)!") ; } } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/schedulerTask/TaskJob.java
New file @@ -0,0 +1,16 @@ package com.dy.common.schedulerTask; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; public abstract class TaskJob implements Job { /** * 定时调度的工作(带参数)。 * @throws JobExecutionException 异常 */ public abstract void execute(JobExecutionContext ctx) throws JobExecutionException; } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/schedulerTask/Test.java
New file @@ -0,0 +1,36 @@ package com.dy.common.schedulerTask; import java.util.HashMap; import org.apache.logging.log4j.*; import org.quartz.*; public class Test extends TaskJob{ private static Logger log = LogManager.getLogger(Test.class.getName()) ; // // public static void main(String[] args) throws Exception{ //// SchedulerTaskSupport.addSecondlyJob("test", "testGroup", Test.class, null, 10, 1, 2) ; //// SchedulerTaskSupport.addMinutelyJob("test", "testGroup", Test.class, null, 1, 1, -1) ; //// SchedulerTaskSupport.addDailyJob("test", "testGroup", Test.class, null, 11, 16) ; //// SchedulerTaskSupport.addWeeklyJob("test", "testGroup", Test.class, null, 6, 11, 34) ; //// SchedulerTaskSupport.addWorkingDayInWeekJob("test", "testGroup", Test.class, null, 11, 41) ; //// SchedulerTaskSupport.addMonthlyJob("test", "testGroup", Test.class, null, 25, 11, 44) ; // // HashMap<String , Object> jobDataMap = new HashMap<String , Object>() ; // jobDataMap.put("123", "test123") ; // // SchedulerTaskSupport.addSecondlyJob("test", "testGroup", Test.class, jobDataMap, 10, 1, 2) ; // } @Override public void execute(JobExecutionContext ctx) throws JobExecutionException { log.info("1_" + System.currentTimeMillis() ) ; JobDataMap jobDataMap = ctx.getJobDetail().getJobDataMap() ; if(jobDataMap != null){ Object o = jobDataMap.get("123") ; log.info("1_" + o ) ; } } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/threadPool/Test.java
New file @@ -0,0 +1,157 @@ package com.dy.common.threadPool; public class Test { // // public static void main(String args[])throws Exception { //// Test t = new Test(); //// t.testShort1(); //// t.testShort2(); //// t.testLong(); // } /** * 测试增加线程线程 * @throws Exception * */ @SuppressWarnings("unused") private void testShort1() throws Exception { TreadPoolFactory.initThreadPoolShort("测试线程组", 2, 1, 10, 20); ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolShort() ; try { int n = 1 ; Thread.sleep(1000); while (n < 4) { System.out.println("加入短工作任务(" + n + ")"); pool.putJob(new ThreadPool.Job() { public void execute() { System.out.println("!!!!!开始短工作" + this.hashCode()); try { Thread.sleep(2000); } catch (Exception e) { ; } System.out.println("!!!!!结束短工作" + this.hashCode()); } @Override public void destroy(){ } @Override public boolean isDestroy(){ return false ; } }); n++ ; Thread.sleep(100); } } catch (Exception e) { } } /** * 测试增加线程线程 * @throws Exception * */ @SuppressWarnings("unused") private void testShort2() throws Exception { TreadPoolFactory.initThreadPoolShort("测试短工作线程组", 2, 1, 5, 6); ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolShort() ; try { int n = 1 ; Thread.sleep(1000); while (n < 2) { System.out.println("加入短工作任务(" + n + ")"); pool.putJob(new ThreadPool.Job() { @Override public void execute() { Long times = 0L ; System.out.println("!!!!!开始短工作" + this.hashCode()); while(true){ if(this.isDestroy == null || !this.isDestroy.booleanValue()){ times++ ; System.out.println("!!!!!执行短工作" + this.hashCode() + " 次数:" + times); try { Thread.sleep(2000); } catch (Exception e) { ; } // System.out.println("!!!!!结束长工作" + this.hashCode()); } } } @Override public void destroy(){ this.isDestroy = true ; } @Override public boolean isDestroy(){ if(this.isDestroy == null){ return false ; } return this.isDestroy.booleanValue() ; } private Boolean isDestroy; }); n++ ; Thread.sleep(100); } } catch (Exception e) { } }/** * 测试增加线程线程 * @throws Exception * */ @SuppressWarnings("unused") private void testLong() throws Exception { TreadPoolFactory.initThreadPoolLong("测试长工作线程组", -1, 1, 5, -1); ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolLong() ; try { int n = 1 ; Thread.sleep(1000); while (n < 2) { System.out.println("加入长工作任务(" + n + ")"); pool.putJob(new ThreadPool.Job() { @Override public void execute() { Long times = 0L ; System.out.println("!!!!!开始长工作" + this.hashCode()); while(true){ if(this.isDestroy == null || !this.isDestroy.booleanValue()){ times++ ; System.out.println("!!!!!执行长工作" + this.hashCode() + " 次数:" + times); try { Thread.sleep(2000); } catch (Exception e) { ; } // System.out.println("!!!!!结束长工作" + this.hashCode()); } } } @Override public void destroy(){ this.isDestroy = true ; } @Override public boolean isDestroy(){ if(this.isDestroy == null){ return false ; } return this.isDestroy.booleanValue() ; } private Boolean isDestroy; }); n++ ; Thread.sleep(100); } } catch (Exception e) { } } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/threadPool/ThreadPool.java
New file @@ -0,0 +1,73 @@ package com.dy.common.threadPool; public interface ThreadPool { /** * 线程池 * @author Administrator * */ public interface Pool{ /** * 把所要执行的工作对象实例放入线程池中 * @param job ThreadJob 工作对象实例 * @throws Exception */ public void putJob(Job job) throws Exception ; } /** * 线程池工作任务 * @author Administrator * */ public interface Job{ /** * 线程池工作类的回调方法。 * 注意: * 1、这个方法内,尽量不使用try catch语句,如果确实需要用, * 要一定用try catch finnaly语句,而且finnaly内不要 * 再有可能产生异常。否则线程得不到结束,不能回归空闲线程池。 * 2、如果在短线程池中,这个方法内不能执行永循环工作,例如有while(true)这样 * 的工作任务,否则至超时时间,系统强制停止此工作。 * 3、如果在短线程池中,这个方法内只能执行短时间即成完成的工作,工作完成后即退出本方法体。 * @throws Exception */ public void execute() throws Exception ; /** * 外部调用,强制销毁工作 * 主要应用: * 如果在execute()方法中有while(true){}死循环,这种情况下,外部调用者没法叫线程停下来的, * 如果外部调用者需要停止此工作,Job接口设计了destroy方法,外部调用者用job对象调此方法完成停止工作。 * 这时需要在while(true){}死循环中判断是否外部调用者要求停止执行job * 示例代码: * public void execute(){ * while(true){ * if(this.isDestroy){ * break; * } * ... ... * } * } * public void destroy(){ * this.isDestroy = true ; * } * * 当然有while(true){}死循环的工作job一定在长线程池中工作,如果在短线程池中工作,线程池监控线程就会 * 以忙碌超时原因把线程强制销毁(线程池监控线程调用工作线程的destroy方法),这里的销毁实际上销毁不了 * 的,只不把该线程清了线程池,这个线程仍然是活着的, 线程里的job仍然被执行,所以在线程的destroy方法 * 中调用了job.destroy(),使job停止下来,线程也自然执行完毕而得以停止,以上是代码逻辑存在while(true){} * 死循环,如果不是代码逻辑,而是程序bug造成的死循环,或程序抛出了异常且未抓住异常,这时无论如何也停止 * 不了工作job,也销毁不了这个线程的 */ public void destroy() ; /** * 判断,工作是否被外部强制销毁,销毁后,持有本job的线程就能施放回归 */ public boolean isDestroy() ; } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/threadPool/ThreadPoolImp.java
New file @@ -0,0 +1,442 @@ package com.dy.common.threadPool; import java.util.*; import org.apache.logging.log4j.*; import com.dy.common.threadPool.ThreadPool.Job; public class ThreadPoolImp { /** * 线程池实现 * @author Administrator * */ public class MyThreadPool implements ThreadPool.Pool{ /** * 线程池唯一实例 */ public MyThreadPool myPool ; /** * 线程池名称; */ private String poolName ; /** * 空闲线程集合 */ private List<MyThread> freeThreads; /** * 工作中的线程集合 */ private List<MyThread> busiThreads; /** * 线程池最大线程数 , 若maxNum <= 0,则maxNum = 1 。 */ private int maxNum; /** * 最小线程数,若minNum > maxNum,则minNum = maxNum 。 */ private int minNum; /** * 当前线程池中的线程数。 */ private int currNum; /** * 空闲线程超时的时长(秒),超过这个时间,就清除多余线程, * 使空闲线程最多是minNum个 */ private long freeTimeout ; /** * 忙碌线程超时的时长(秒),超过这个时间,就认为是崩溃线程,将被清除。 */ private long busyTimeout ; /** * 同步对象 */ private Object synObj = new Object(); /** * 内部启动的空闲和忙碌超时线程的线程 */ private Thread monitorThread ; /** * 日志 */ private Logger log = LogManager.getLogger(MonitorThread.class.getName()); /** * 得到默认唯一实例 * @return */ // public MyThreadPool getDefaultInstance(){ // if(myPool == null){ // myPool = new MyThreadPool(null, null, null, null, null) ; // } // return myPool ; // } /** * 得到唯一实例 * @param poolName * @param maxNum * @param minNum * @param freeTimeout * @param busyTimeout * @return */ // public MyThreadPool getInstance( // String poolName , // Integer maxNum , // Integer minNum , // Long freeTimeout , // Long busyTimeout){ // if(myPool == null){ // myPool = new MyThreadPool(poolName, maxNum, minNum, freeTimeout, busyTimeout) ; // } // return myPool ; // } /** * 线程池构造方法 * @param poolName 线程池和线程名称 * @param maxNum 线程池最大线程数,若为-1,不受限制 * @param minNum 线程池最小线程数,或初始线程数 * @param freeTimeout 空闲线程超时时长(秒) * @param busyTimeout 忙碌线程超时时长(秒),若为-1,不受限制 */ public MyThreadPool( String poolName , Integer maxNum , Integer minNum , Long freeTimeout , Long busyTimeout) { if(poolName == null){ poolName="线程池" ; } this.poolName = poolName ; if(maxNum == null || maxNum.intValue() <= 0){ maxNum = -1 ; } if(minNum == null || minNum.intValue() < 0){ minNum = 0 ; } if(minNum > maxNum){ minNum = maxNum ; } this.maxNum = maxNum ; this.minNum = minNum ; this.currNum = 0; if(freeTimeout == null || freeTimeout.longValue() <= 0){ freeTimeout = 60000L ; } this.freeTimeout = freeTimeout ; if(busyTimeout == null || busyTimeout.longValue() <= 0){ this.busyTimeout = -1L ; }else{ this.busyTimeout = busyTimeout ; } this.busiThreads = new ArrayList<MyThread>(); this.freeThreads = new ArrayList<MyThread>(); //最小化线程池 for (int i = 0; i < this.minNum ; i++) { MyThread t = new MyThread(this); t.start(); this.freeThreads.add(t); this.currNum++; } this.monitorThread = new MonitorThread(this) ; this.monitorThread.start() ; } /** * 把所要执行的工作对象实例放入线程池中 * @param job ThreadJob 工作对象实例 * @throws Exception */ @Override public void putJob(Job job) throws Exception { synchronized(this.synObj) { //log.debug("工作任务分配到线程池中。") ; MyThread t = null ; if (this.freeThreads.size() == 0) { //当前没有空闲线程 if (this.maxNum == -1 || this.currNum < this.maxNum) { //当前线程数未达到最大值 , 增加新的线程 t = new MyThread(this); t.start(); this.currNum++; } else { //当前线程达到最大数了,等待回归线程 while (freeThreads.size() == 0) { //如果没有空闲的线程,等待工作线程工作完成回来 try { //log.warn("线程池(" + this.poolName + ")中线程数达到上限,新工作任务等待释放线程!"); synObj.wait(); } catch (Exception e) { log.error("'" + this.poolName + "'线程池中线程等待释放线时发生等待异常!", e); } t = (MyThread) freeThreads.get(0); if (t != null) { //说明得到了释放回来的线程 freeThreads.remove(0); break; }else{ //说明没有得到释放回来的线程,可以其他线程 continue; } }//end while }//end else }//end if(freeThreads.size() == 0) else { t = (MyThread) freeThreads.get(0); freeThreads.remove(0); } busiThreads.add(t); t.putJob(job); } } /** * 线程工作完成,从busiThreads回归freeThreads */ protected void freeThread(MyThread t) { synchronized (synObj) { busiThreads.remove(t); freeThreads.add(t); synObj.notify(); } } /** * 监控超时线程的线程 * @author Administrator * */ protected class MonitorThread extends Thread { private MyThreadPool pool ; private MyThread t ; private Iterator<?> it ; /** * * @param pool */ public MonitorThread(MyThreadPool pool){ this.pool = pool ; } /** * */ @SuppressWarnings("finally") public void run(){ long interval = pool.freeTimeout ; if(pool.busyTimeout > 0 && pool.busyTimeout < pool.freeTimeout){ interval = pool.busyTimeout ; } boolean isException = false ; while(true){ t = null ; it = null ; try{ MonitorThread.sleep(interval) ; }catch(Exception e){ isException = true ; }finally{ if(isException){ isException = false ; continue ; } } try{ synchronized (pool.synObj) { if(pool.freeThreads.size() > pool.minNum){ //如果空闲线程大于最小线程数,则清理空闲线程 int num = pool.freeThreads.size() - pool.minNum ; int count = 0 ; it = pool.freeThreads.iterator() ; while(it.hasNext()){ if(count == num) { break ; } count ++ ; t = (MyThread)it.next() ; if((System.currentTimeMillis() - t.time) >= pool.freeTimeout){ it.remove() ; pool.currNum-- ; //log.debug("线程池(" + pool.poolName + ")中清除了一个超时空闲线程!"); t.destroy() ; t = null ; } } }//end if if(pool.busyTimeout != -1 && pool.busyTimeout > 0){ it = pool.busiThreads.iterator() ; while(it.hasNext()){ t = (MyThread)it.next() ; if((System.currentTimeMillis() - t.time) >= pool.busyTimeout){ it.remove() ; pool.currNum-- ; log.error("线程池(" + pool.poolName + ")中清除了一个超时崩溃(忙碌)线程!"); t.destroy() ; t = null ; } } } }//end synchronized (pool.synObj) }catch(Exception e){ }finally{ continue ; } }//end while(true) } } } /** * 池中线程实现 * @author Administrator * */ public class MyThread extends Thread{ /** * 标示线程是活的 */ private boolean living = true ; /** * 线程忙碌或空闲记时器 */ protected long time ; /** * 当前线程所处的线程池 */ private MyThreadPool pool ; /** * 线程具体工作的回调类 */ private Job job ; /** * 指示线程可以工作 */ private Boolean canJob ; private Logger log = LogManager.getLogger(MyThread.class.getName()) ; protected MyThread(MyThreadPool pool) { super(); this.pool = pool ; this.time = 0 ; this.canJob = false ; } /** * 设置线程工作对象 * @param job */ protected void putJob(Job job) throws Exception { if(job == null){ this.job = new Job(){ public void execute(){ } public void destroy(){ } public boolean isDestroy(){ return false ; } }; } synchronized (this) { this.job = job ; this.canJob = true; //忙碌记时开始 this.time = System.currentTimeMillis() ; this.notify(); } } /** * */ @SuppressWarnings("finally") @Override public void run(){ while (living){ synchronized (this){ while(!canJob){ //当不能工作时 try{ this.wait(); }catch (Exception e) { log.error("线程池(" + pool.poolName + ")的工作线程等待可以工作的信号时发生等待异常:\n" + e.getMessage(), e); this.canJob = false ; continue; } } /////////////////////// //被唤醒,有新工作了 try{ if(this.job != null){ this.job.execute() ; } }catch (Exception ee) { log.error("线程池(" + pool.poolName + ")的工作线程在执行工作时发生异常:\n" + ee.getMessage(), ee); //ee.printStackTrace() ; }finally { this.canJob = false ; this.job = null ; if(living){ //线程未被销毁 this.free() ; } continue; } }// end synchronized(this) }// end while(living) } public void free(){ //使本线程回归空闲线程池 pool.freeThread(this); //空闲开始记时 this.time = System.currentTimeMillis() ; // 没有可做的了 this.canJob = false; log.debug("线程池(" + this.pool.poolName + ")中的线程回归空闲集合。"); } /** * 销毁工作线程 */ public void destroy() { //线程销毁前,首先把其所持有的工作销毁,否则线程停不下来,也销毁不了 if(this.job != null){ this.job.destroy(); } this.living = false ;//使线程从run方法的长久while(living)中退出来,从线程自动销毁 this.job = null ; } } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/threadPool/TreadPoolFactory.java
New file @@ -0,0 +1,79 @@ package com.dy.common.threadPool; public class TreadPoolFactory { private static ThreadPool.Pool pool_short ;//短工作任务线程池,线程工作用时较短 private static ThreadPool.Pool pool_long ;//长工作任务线程池,线程工作用时较长 /** * 初始化线程池 * @param threadPoolName 线程池和线程名称 * @param maxThreadNum 线程池最大线程数 ,若为-1,不受限制 * @param minThreadNum 线程池最小线程数,或初始线程数 * @param freeTimeout 空闲线程超时时长(秒) * @param busyTimeout 忙碌线程超时时长(秒),若为-1,不受限制 * @return 线程池实例 */ public final static void initThreadPoolShort(String poolName, int maxNum, int minNum, long freeTimeout, long busyTimeout) throws Exception { if(pool_short!= null){ throw new Exception("线程池不能重复初始化!"); } if(pool_short == null){ pool_short = new ThreadPoolImp().new MyThreadPool(poolName, maxNum, minNum, freeTimeout, busyTimeout); } } /** * 初始化线程池 * @param threadPoolName 线程池和线程名称 * @param maxThreadNum 线程池最大线程数,若为-1,不受限制 * @param minThreadNum 线程池最小线程数,或初始线程数 * @param freeTimeout 空闲线程超时时长(秒) * @param busyTimeout 忙碌线程超时时长(秒),若为-1,不受限制 * @return 线程池实例 */ public final static void initThreadPoolLong(String poolName, int maxNum, int minNum, long freeTimeout, long busyTimeout) throws Exception { if(pool_long!= null){ throw new Exception("线程池不能重复初始化!"); } if(pool_long == null){ pool_long = new ThreadPoolImp().new MyThreadPool(poolName, maxNum, minNum, freeTimeout, busyTimeout); } } /** * 得到唯一线程池实例 * @param dataSourceName * @return * @throws Exception */ public final static ThreadPool.Pool getThreadPoolShort() throws Exception { if (pool_short == null) { throw new Exception("得到线程池前首先必须初始化!"); } return pool_short ; } /** * 得到唯一线程池实例 * @param dataSourceName * @return * @throws Exception */ public final static ThreadPool.Pool getThreadPoolLong() throws Exception { if (pool_long == null) { throw new Exception("得到线程池前首先必须初始化!"); } return pool_long ; } } pipIrr-platform/pipIrr-global/src/main/resources/mapper/BaClientMapper.xml
@@ -19,8 +19,8 @@ <result column="district" jdbcType="VARCHAR" property="district" /> <result column="address" jdbcType="VARCHAR" property="address" /> <result column="remark" jdbcType="VARCHAR" property="remark" /> <result column="disabled" jdbcType="TINYINT" property="disabled" /> <result column="deleted" jdbcType="TINYINT" property="deleted" /> <result property="disabled" column="disabled" typeHandler="com.dy.common.mybatis.envm.EnumCodeTypeHandler" javaType="com.dy.common.mybatis.envm.Disabled"/> <result property="deleted" column="deleted" typeHandler="com.dy.common.mybatis.envm.EnumCodeTypeHandler" javaType="com.dy.common.mybatis.envm.Deleted"/> </resultMap> <sql id="Base_Column_List"> <!--@mbg.generated--> @@ -29,8 +29,8 @@ </sql> <sql id="part_Column_List"> <!--@mbg.generated--> id, countyId, townId, villageId, blockId, divideId, typeId, `name`, num, phone, idCard, area, district,address, remark, disabled ${alias}.id, ${alias}.countyId, ${alias}.townId, ${alias}.villageId, ${alias}.blockId, ${alias}.divideId, ${alias}.typeId, ${alias}.name, ${alias}.num, ${alias}.phone, ${alias}.idCard, ${alias}.area, ${alias}.district,${alias}.address, ${alias}.remark, ${alias}.disabled </sql> <select id="selectByPrimaryKey" parameterType="java.lang.Long" resultMap="BaseResultMap"> @@ -49,38 +49,46 @@ <select id="selectTotal" parameterType="java.util.Map" resultType="java.lang.Long"> select count(*) from ba_client where deleted != 1 from ba_client bc left join ba_block bbk on bc.blockId = bbk.id left join ba_client_type bct on bc.typeId = bct.id where bc.deleted != 1 <trim prefix="and" suffixOverrides="and"> <if test="countyId != null"> countyId = #{countyId,jdbcType=BIGINT} and bc.countyId = #{countyId,jdbcType=BIGINT} and </if> <if test="townId != null"> townId = #{townId,jdbcType=BIGINT} and bc.townId = #{townId,jdbcType=BIGINT} and </if> <if test="villageId != null"> villageId = #{villageId,jdbcType=BIGINT} and bc.villageId = #{villageId,jdbcType=BIGINT} and </if> <if test="blockId != null"> blockId = #{blockId,jdbcType=BIGINT} and bc.blockId = #{blockId,jdbcType=BIGINT} and </if> <if test="name != null"> name like concat('%', #{name}, '%') and bc.name like concat('%', #{name}, '%') and </if> <if test="num != null"> num = '#{num,jdbcType=VARCHAR}' and bc.num = '#{num,jdbcType=VARCHAR}' and </if> <if test="phone != null"> phone = '#{phone,jdbcType=VARCHAR}' and bc.phone = '#{phone,jdbcType=VARCHAR}' and </if> </trim> </select> <select id="selectSome" parameterType="java.util.Map" resultMap="BaseResultMap"> <!--@mbg.generated--> select <include refid="part_Column_List" /> from ba_client where deleted != 1 <include refid="part_Column_List" > <property name="alias" value="bc"/> </include>, bbk.name blockName, bct.name typeName from ba_client bc left join ba_block bbk on bc.blockId = bbk.id left join ba_client_type bct on bc.typeId = bct.id where bc.deleted != 1 <trim prefix="and" suffixOverrides="and"> <if test="countyId != null"> countyId = #{countyId,jdbcType=BIGINT} and @@ -95,16 +103,16 @@ blockId = #{blockId,jdbcType=BIGINT} and </if> <if test="name != null"> name like concat('%', #{name}, '%') and bc.name like concat('%', #{name}, '%') and </if> <if test="num != null"> num = '#{num,jdbcType=VARCHAR}' and bc.num = '#{num,jdbcType=VARCHAR}' and </if> <if test="phone != null"> phone = '#{phone,jdbcType=VARCHAR}' and bc.phone = '#{phone,jdbcType=VARCHAR}' and </if> </trim> order by id DESC order by bc.id DESC <trim prefix="limit " > <if test="start != null and count != null"> #{start}, #{count}