1、通信中间件多个恒久任务由一个线程执行改为一个恒久任务一个线程执行,原因是单个恒久任务可能用时很长 ;
2、优化通信中间件代码
1个文件已添加
4个文件已修改
175 ■■■■■ 已修改文件
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/core/CoreConstantManage.java 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/core/CoreConstantThread.java 95 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/core/CoreUnit.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-accept/src/main/java/com/dy/aceMw/server/tasks/FromRtuConstantTask.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-accept/src/main/java/com/dy/aceMw/server/tasks/ToRtuConstantTask.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/core/CoreConstantManage.java
New file
@@ -0,0 +1,43 @@
package com.dy.common.mw.core;
import java.util.List;
import org.apache.logging.log4j.*;
public class CoreConstantManage {
    private final static Logger log = LogManager.getLogger(CoreConstantManage.class.getName()) ;
    private final static CoreConstantManage instance = new CoreConstantManage() ;
    private static Long sleepBigBusy = 100L ;//大忙时(除了恒久任务,还是其他任务),核心线程暂停间隔
    private static Long sleepSmallBusy = 500L ;//小忙时(只有恒久任务,无其他任务),核心线程暂停间隔
    private CoreConstantManage(){
    }
    public static CoreConstantManage getInstance(){
        return instance ;
    }
    /**
     * 设置暂停时长
     * @param sleepBigBusy 大忙时(除了恒久任务,还是其他任务),核心线程暂停间隔
     * @param sleepSmallBusy 小忙时(只有恒久任务,无其他任务),核心线程暂停间隔
     */
    public void setSleep(Long sleepBigBusy, Long sleepSmallBusy){
        CoreConstantManage.sleepBigBusy = sleepBigBusy ;
        CoreConstantManage.sleepSmallBusy = sleepSmallBusy ;
    }
    public void start(){
        //恒久任务
        List<CoreTask> constantTasks = CoreUnit.getAllConstantTasks();
        if (constantTasks != null && constantTasks.size() > 0) {
            for (CoreTask task : constantTasks) {
                new CoreConstantThread(sleepBigBusy, sleepSmallBusy, task).start();
            }
        }
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/core/CoreConstantThread.java
@@ -1,64 +1,45 @@
package com.dy.common.mw.core;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.*;
/**
 * @Author liurunyu
 * @Date 2023/12/19 16:41
 * @LastEditTime 2023/12/19 16:41
 * @Description
 */
public class CoreConstantThread extends Thread {
public class CoreConstantThread extends Thread{
    private final static Logger log = LogManager.getLogger(CoreConstantThread.class.getName()) ;
    private final static CoreConstantThread instance = new CoreConstantThread() ;
    private final static Logger log = LogManager.getLogger(CoreConstantThread.class.getName()) ;
    @SuppressWarnings("unused")
    private static Long sleepBigBusy = 100L ;//大忙时(除了恒久任务,还是其他任务),核心线程暂停间隔
    private static Long sleepSmallBusy = 500L ;//小忙时(只有恒久任务,无其他任务),核心线程暂停间隔
    private CoreConstantThread(){
    }
    public static CoreConstantThread getInstance(){
        return instance ;
    }
    /**
     * 设置暂停时长
     * @param sleepBigBusy 大忙时(除了恒久任务,还是其他任务),核心线程暂停间隔
     * @param sleepSmallBusy 小忙时(只有恒久任务,无其他任务),核心线程暂停间隔
     */
    public void setSleep(Long sleepBigBusy, Long sleepSmallBusy){
        CoreConstantThread.sleepBigBusy = sleepBigBusy ;
        CoreConstantThread.sleepSmallBusy = sleepSmallBusy ;
    }
    private long sleepBigBusy ;
    private long sleepSmallBusy ;
    private CoreTask task ;
    /**
     * 核心单线程,执行所有的单线程任务
     */
    @Override
    @SuppressWarnings("InfiniteLoopStatement")
    public void run(){
        int count ;
        Integer temp ;
        while (true) {
            count = 0;
            try {
                //恒久任务
                List<CoreTask> constantTasks = CoreUnit.getAllConstantTasks();
                if (constantTasks != null && constantTasks.size() > 0) {
                    for (CoreTask task : constantTasks) {
                        temp = task.execute();
                        if (temp != null) {
                            count += temp;
                        }
                    }
                }
                if (count == 0) {
                    //小暂停一下
                    Thread.sleep(sleepSmallBusy);
                }
            } catch (Exception e) {
                log.error("核心线程发生异常" + (e.getMessage() == null ? "" : (":" + e.getMessage())), e);
            }
        }
    }
    public CoreConstantThread(long sleepBigBusy, long sleepSmallBusy, CoreTask task){
        this.sleepBigBusy = sleepBigBusy ;
        this.sleepSmallBusy = sleepSmallBusy ;
        this.task = task ;
    }
    @Override
    public void run() {
        if(task != null){
            int count ;
            while (true) {
                try {
                    count = task.execute();
                    if (count == 0) {
                        //小暂停一下
                        Thread.sleep(sleepBigBusy);
                    }else{
                        Thread.sleep(sleepSmallBusy);
                    }
                } catch (Exception e) {
                    log.error("恒久任务" + task.getClass().getName() + "执行时发生异常" + (e.getMessage() == null ? "" : (":" + e.getMessage())), e);
                }
            }
        }
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/core/CoreUnit.java
@@ -71,9 +71,9 @@
            ct.setSleep(this.adapter.getConfig().sleepBigBusy, this.adapter.getConfig().sleepSmallBusy);
            ct.start(); 
            
            CoreConstantThread cct = CoreConstantThread.getInstance() ;
            cct.setSleep(this.adapter.getConfig().sleepBigBusy, this.adapter.getConfig().sleepSmallBusy);
            cct.start();
            CoreConstantManage ccm = CoreConstantManage.getInstance() ;
            ccm.setSleep(this.adapter.getConfig().sleepBigBusy, this.adapter.getConfig().sleepSmallBusy);
            ccm.start();
            
            if(adapter.getConfig().showStartInfo){
                System.out.println("核心模块成功启动,"
pipIrr-platform/pipIrr-mw/pipIrr-mw-accept/src/main/java/com/dy/aceMw/server/tasks/FromRtuConstantTask.java
@@ -45,10 +45,10 @@
     */
    private Node doDealRtuUpData(Node first, Node last){
        if(last != null){
            //在dealNode方法中,可能要把last从队列中移除,这时last.pre为空,所以提前把last.pre取出来
            Node pre = last.pre ;
            dealNode(last) ;
            if(first != last){
                //在dealNode方法中,可能要把last从队列中移除,这时last.pre为空,所以提前把last.pre取出来
                Node pre = last.pre ;
                dealNode(last) ;
                return pre ;
            }else{
                //停止
@@ -102,9 +102,11 @@
     * @param node 节点
     */
    private void dealNode(Node node){
        RtuDataNode obj = (RtuDataNode)node.obj ;
        obj.dealSelf() ;
        RtuDataCache.removeNode(node);
        if(node != null && node.obj != null){
            RtuDataNode obj = (RtuDataNode)node.obj ;
            obj.dealSelf() ;
            RtuDataCache.removeNode(node);
        }
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-accept/src/main/java/com/dy/aceMw/server/tasks/ToRtuConstantTask.java
@@ -26,6 +26,7 @@
            log.error("更新RTU会话上报数据时刻时发生集合操作异常,此异常并不影响系统正常运行", e);
        }
        try{
            //log.info("当前下行命令队列中还有结点数量:" + TcpDownCommandCache.size());
            dealDownCom() ;
        }catch(Exception e){
            log.error(e);
@@ -60,10 +61,10 @@
     */
    private Node doDealDownComm(Long now, Node first, Node last){
        if(last != null){
            //在dealNode方法中,可能要把last从队列中移除,这时last.pre为空,所以提前把last.pre取出来
            Node pre = last.pre ;
            dealNode(now, last) ;
            if(first != last){
                //在dealNode方法中,可能要把last从队列中移除,这时last.pre为空,所以提前把last.pre取出来
                Node pre = last.pre ;
                dealNode(now, last) ;
                return pre ;
            }else{
                //停止
@@ -142,10 +143,12 @@
     * @param node 节点
     */
    private void dealNode(Long now, Node node){
        TcpDownCommandObj obj = (TcpDownCommandObj)node.obj ;
        boolean removeNode = obj.dealSelf(now) ;
        if(removeNode){
            TcpDownCommandCache.removeNode(node);
        if(node != null && node.obj != null){
            TcpDownCommandObj obj = (TcpDownCommandObj)node.obj ;
            boolean removeNode = obj.dealSelf(now) ;
            if(removeNode){
                TcpDownCommandCache.removeNode(node);
            }
        }
    }