pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/threadPool/ThreadPool.java
@@ -10,6 +10,13 @@ */ public interface Pool{ /** * 线程池中线程个数 * @return */ public Integer size() ; public Integer maxThread() ; public Integer minThread() ; /** * 把所要执行的工作对象实例放入线程池中 * @param job ThreadJob 工作对象实例 * @throws Exception pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/threadPool/ThreadPoolImp.java
@@ -131,6 +131,22 @@ this.monitorThread.start() ; } } /** * 线程池中线程个数 * @return */ @Override public Integer size() { return currNum ; } @Override public Integer maxThread() { return maxNum ; } @Override public Integer minThread() { return minNum ; } /** * 把所要执行的工作对象实例放入线程池中 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/threadPool/TreadPoolFactory.java
@@ -6,6 +6,26 @@ private static ThreadPool.Pool pool_short ;//短工作任务线程池,线程工作用时较短 private static ThreadPool.Pool pool_long ;//长工作任务线程池,线程工作用时较长 public final static Integer[] pool_short_state(){ Integer shortCurThread = 0 ;//短线程池当前线程数 Integer shortMaxThread = 0 ;//短线程池最大线程数 Integer shortMinThread = 0 ;//短线程池最小线程数 shortCurThread = pool_short.size() ; shortMaxThread = pool_short.maxThread() ; shortMinThread = pool_short.minThread() ; return new Integer[]{shortCurThread, shortMaxThread, shortMinThread} ; } public final static Integer[] pool_long_state(){ Integer longCurThread = 0 ;//短线程池当前线程数 Integer longMaxThread = 0 ;//短线程池最大线程数 Integer longMinThread = 0 ;//短线程池最小线程数 longCurThread = pool_long.size() ; longMaxThread = pool_long.maxThread() ; longMinThread = pool_long.minThread() ; return new Integer[]{longCurThread, longMaxThread, longMinThread} ; } /** * 初始化线程池 * @param poolName 线程池和线程名称 pipIrr-platform/pipIrr-global/src/main/java/com/dy/pipIrrGlobal/voSt/VoClientAmountStatistics.java
@@ -5,7 +5,7 @@ /** * @Author: liurunyu * @Date: 2024/7/24 17:09 * @Description * @Description 以用水户统计用水量 */ @Data public class VoClientAmountStatistics { pipIrr-platform/pipIrr-global/src/main/java/com/dy/pipIrrGlobal/voSt/VoIntakeAmountStatistics.java
@@ -5,7 +5,7 @@ /** * @Author: liurunyu * @Date: 2024/7/24 14:16 * @Description 以取水口统计漏损 * @Description 以取水口统计取水量 */ @Data public class VoIntakeAmountStatistics { pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/TcpDownCommandCache.java
@@ -25,6 +25,29 @@ return instance ; } public static Integer[] info(){ Integer comTotalDown = 0 ;//缓存的下行命令总数 Integer comNoResTotalDownByOnLine = 0 ;//RTU在线,但对其下行命令未收到应答的总数 Integer comNoResTotalDownByOffLine = 0 ;//RTU离线,但对其下行命令未收到应答的总数 MidResultToRtu res ; TcpDownCommandObj obj ; Node node = cacheQueue.getFirstNode() ; while(node != null && node.obj != null){ obj = (TcpDownCommandObj)node.obj; res = obj.result ; if(!obj.onceReceivedResult){ comTotalDown ++ ; if(TcpSessionCache.isConnect(res.rtuAddr)){ comNoResTotalDownByOnLine ++ ; }else{ comNoResTotalDownByOffLine ++ ; } } } return new Integer[]{comTotalDown, comNoResTotalDownByOnLine, comNoResTotalDownByOffLine} ; } /** * 缓存命令 * @param result pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/TcpSessionCache.java
@@ -24,6 +24,29 @@ */ private static HashMap<String, TcpSession> sessionTable = new HashMap<String, TcpSession>() ; /** * 得到信息 * @return */ public static Integer[] info(){ Integer rtuTotalConnect = 0 ;//已经连接过中间件的RTU总数(包括在线与离线的) Integer rtuTotalOnLine = 0 ;//在线RTU总数 Integer rtuTotalOffLine = 0 ;//离线RTU总数 synchronized (sessionTable){ rtuTotalConnect = sessionTable.size() ; Collection<TcpSession> col = sessionTable.values() ; for(TcpSession se : col){ if(se.ioSession.isConnected()){ rtuTotalOnLine ++ ; }else{ rtuTotalOffLine ++ ; } } } return new Integer[] {rtuTotalConnect, rtuTotalOnLine, rtuTotalOffLine} ; } /** * 关闭所有网络连接 */ pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
@@ -29,6 +29,8 @@ return this.stopTcpSv(com) ; }else if(code.equals(CodeLocal.recoverTcpSv)){ return this.recoverTcpSv(com) ; }else if(code.equals(CodeLocal.mwState)){ return this.mwInfo(com) ; } return ReturnCommand.errored("出错,收到内部命令的功能码不能识别!", com.getId(), com.getCode()) ; } @@ -84,4 +86,14 @@ /** * 查询通信中间件运行情况 * @throws Exception */ private Command mwInfo(Command command) throws Exception{ MwInfoVo mwInfo = new MwInfoDeal().deal() ; return ReturnCommand.successed("查询通信中间件运行情况", command.getId(), command.getCode(), mwInfo) ; } } pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
@@ -9,6 +9,9 @@ public static final String allProtocols = "LCD0100" ;//查询所有协议配置 public static final String stopTcpSv = "LCD0110" ;//停止TCP服务,不再接入新的TCP连接,已经TCP连接的全部断连接 public static final String recoverTcpSv = "LCD0112" ;//重启TCP服务,接入新的TCP连接 public static final String mwState = "LCD0200" ;//得到通信中间件运行信息 } pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/MwInfoDeal.java
New file @@ -0,0 +1,44 @@ package com.dy.rtuMw.server.local.localProtocol; import com.dy.common.threadPool.TreadPoolFactory; import com.dy.rtuMw.server.forTcp.TcpDownCommandCache; import com.dy.rtuMw.server.forTcp.TcpSessionCache; import com.dy.rtuMw.server.rtuData.RtuDataCache; import com.dy.rtuMw.server.rtuData.TaskPool; /** * @Author: liurunyu * @Date: 2024/7/29 11:07 * @Description */ public class MwInfoDeal { public MwInfoVo deal(){ MwInfoVo vo = new MwInfoVo() ; Integer[] info = TcpSessionCache.info() ; vo.rtuTotalConnect = info[0] ; vo.rtuTotalOnLine = info[1] ; vo.rtuTotalOffLine = info[2] ; info = TcpDownCommandCache.info() ; vo.comTotalDown = info[0] ; vo.comNoResTotalDownByOnLine = info[1] ; vo.comNoResTotalDownByOffLine = info[2] ; vo.dataTotalUp = RtuDataCache.size() ; info = TreadPoolFactory.pool_short_state() ; vo.shortCurThread = info[0] ; vo.shortMaxThread = info[1] ; vo.shortMinThread = info[2] ; info = TreadPoolFactory.pool_long_state() ; vo.longCurThread = info[0] ; vo.longMaxThread = info[1] ; vo.longMinThread = info[2] ; vo.taskTreePoolTotalInstance = TaskPool.totalTasks() ; return vo ; } } pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/MwInfoVo.java
New file @@ -0,0 +1,52 @@ package com.dy.rtuMw.server.local.localProtocol; import lombok.Data; /** * @Author: liurunyu * @Date: 2024/7/29 10:54 * @Description */ @Data public class MwInfoVo { public Integer rtuTotalConnect ;//已经连接过中间件的RTU总数(包括在线与离线的) public Integer rtuTotalOnLine ;//在线RTU总数 public Integer rtuTotalOffLine ;//离线RTU总数 public Integer comTotalDown ;//缓存的下行命令总数 public Integer comNoResTotalDownByOnLine;//RTU在线,但对其下行命令未收到应答的总数 public Integer comNoResTotalDownByOffLine;//RTU离线,但对其下行命令未收到应答的总数 public Integer dataTotalUp ;//上行数据在缓存中还未处理的总数 public Integer shortCurThread = 0 ;//短线程池当前线程数 public Integer shortMaxThread = 0 ;//短线程池最大线程数 public Integer shortMinThread = 0 ;//短线程池最小线程数 public Integer longCurThread = 0 ;//长线程池当前线程数 public Integer longMaxThread = 0 ;//长线程池最大线程数 public Integer longMinThread = 0 ;//长线程池最小线程数 public Integer taskTreePoolTotalInstance = 0 ;//任务树池任务实例总数 public String toString(){ StringBuilder sb = new StringBuilder() ; sb.append("通信中间件运行信息:\n"); sb.append(" 连接RTU总数:" + rtuTotalConnect + "\n"); sb.append(" 在线RTU总数:" + rtuTotalOnLine + "\n"); sb.append(" 离线RTU总数:" + rtuTotalOffLine + "\n"); sb.append(" 下行未应答命令总数:" + comTotalDown + "\n"); sb.append(" 在线RTU未应答命令总数:" + comNoResTotalDownByOnLine + "\n"); sb.append(" 离线RTU未应答命令总数:" + comNoResTotalDownByOffLine + "\n"); sb.append(" 上行未处理数据总数:" + dataTotalUp + "\n"); sb.append(" 短线程池当前线程数:" + shortCurThread + "\n"); sb.append(" 短线程池配置最大线程数:" + shortMaxThread + "\n"); sb.append(" 短线程池配置最小线程数:" + shortMinThread + "\n"); sb.append(" 长线程池当前线程数:" + longCurThread + "\n"); sb.append(" 长线程池配置最大线程数:" + longMaxThread + "\n"); sb.append(" 长线程池配置最小线程数:" + longMinThread + "\n"); sb.append(" 任务树池任务实例总数:" + taskTreePoolTotalInstance + "\n"); sb.append("\n"); return sb.toString() ; } } pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TaskPool.java
@@ -13,11 +13,21 @@ private static List<TaskSurpport> tasks = new ArrayList<TaskSurpport>() ; private static Integer taskTotal = 0 ; private static TreeConfig taskTreeConf ; public static void setTaskTreeCofig(TreeConfig conf){ taskTreeConf = conf ; } /** * 得到任务树实例总数 * @return */ public static Integer totalTasks() { return taskTotal ; } @@ -39,6 +49,8 @@ } finally { if(t == null){ log.error("实例化上行数据处理任务对象失败!" ); }else{ taskTotal++ ; } } } pipIrr-platform/pipIrr-web/pipIrr-mwTest-rtu/src/main/resources/Config.xml
@@ -4,5 +4,5 @@ <base rtuAddr="532328059995" onlyOneHeartBeat="true" icCardAddr="3B7D1E1A" icCardNo="61181622830147821"></base> <!-- --> <tcpCl mwServerIp="192.168.40.182" mwServerPort="60000" connectTimeout="3000" /> <tcpCl mwServerIp="127.0.0.1" mwServerPort="60000" connectTimeout="3000" /> </config> pipIrr-platform/pipIrr-web/pipIrr-mwTest-web/src/main/java/com/dy/pipIrrMwTestWeb/common/CodeLocal.java
@@ -9,6 +9,9 @@ public static final String allProtocols = "LCD0100" ;//查询所有协议配置 public static final String stopTcpSv = "LCD0110" ;//停止TCP服务,不再接入新的TCP连接,已经TCP连接的全部断连接 public static final String recoverTcpSv = "LCD0112" ;//恢复TCP服务,接入新的TCP连接 public static final String mwState = "LCD0200" ;//得到通信中间件运行信息 } pipIrr-platform/pipIrr-web/pipIrr-mwTest-web/src/main/java/com/dy/pipIrrMwTestWeb/p206V1_0_0/CommandP206V1_0_0Ctrl.java
@@ -35,6 +35,8 @@ rt = this.stopTcpSv() ; }else if(com.equals(CodeLocal.recoverTcpSv)){ rt = this.recoverTcpSv() ; }else if(com.equals(CodeLocal.mwState)){ rt = this.mwState() ; }else if(com.equals("10")){ rt = this.cd10() ; }else if(com.equals("21")){ @@ -108,6 +110,10 @@ return this.sendCom2Mw(this.commandLocal(CodeLocal.recoverTcpSv, null, null)) ; } private BaseResponse mwState(){ return this.sendCom2Mw(this.commandLocal(CodeLocal.mwState, null, null)) ; } private BaseResponse cd10(){ Com10Vo comVo = new Com10Vo() ; comVo.rtuAddr = "532328059995" ;//前6位是行政区划码,后6位是序列号最大是065535