pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/tcp/DataCodecFactory.java
@@ -8,16 +8,32 @@ public class DataCodecFactory implements ProtocolCodecFactory { private TcpUnitAdapter adapter ; private boolean stop = false ;//为true时,停止TCP服务,并把已经TCP连接断连接 public DataCodecFactory(TcpUnitAdapter adapter) { this.adapter = adapter ; } public void stop(){ stop = true ; } public void recover(){ this.stop = false ; } public ProtocolEncoder getEncoder(IoSession ioSession) { if(stop){ ioSession.closeNow() ; } return new DataEncoder(); } public ProtocolDecoder getDecoder(IoSession ioSession) { if(stop){ ioSession.closeNow() ; } return new DataDecoder(adapter.newPrefixedDataAvailableHandle()); } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/tcp/TcpIoHandler.java
@@ -8,9 +8,18 @@ private TcpUnitAdapter adapter ; private boolean stop = false ;//为true时,停止TCP服务,并把已经TCP连接断连接 public TcpIoHandler(TcpUnitAdapter adapter){ this.adapter = adapter ; } public void stop(){ stop = true ; } public void recover(){ this.stop = false ; } /** @@ -18,14 +27,22 @@ */ @Override public void sessionCreated(IoSession session) throws Exception { super.sessionCreated(session); if(stop){ session.closeNow() ; }else{ super.sessionCreated(session); } } /** * 会话open时回调的方法 */ public void sessionOpened(IoSession session) throws Exception { this.adapter.newSessionEventCallback().sessionOpened(session); if(stop){ session.closeNow() ; }else{ this.adapter.newSessionEventCallback().sessionOpened(session); } } /** @@ -76,7 +93,11 @@ */ @Override public void messageReceived(IoSession session, Object message) throws Exception { this.adapter.newSessionEventCallback().messageReceived(session, message); if(stop){ session.closeNow() ; }else{ this.adapter.newSessionEventCallback().messageReceived(session, message); } } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/tcp/TcpUnit.java
@@ -22,6 +22,8 @@ private static boolean started = false ; private TcpUnitAdapter adapter ; private TcpIoHandler tcpIoHandler ; private DataCodecFactory dataCodecFactory ; private TcpUnit(){} ; @@ -94,7 +96,8 @@ //得到网络 通信数据过滤器链 DefaultIoFilterChainBuilder chain = acceptor.getFilterChain() ; //编解码过滤器 chain.addLast("protocol", new ProtocolCodecFilter(new DataCodecFactory(this.adapter))); dataCodecFactory = new DataCodecFactory(this.adapter) ; chain.addLast("protocol", new ProtocolCodecFilter(dataCodecFactory)); /* * 一般ExecutorFilter 都要放在ProtocolCodecFilter 过滤器的后面, @@ -106,7 +109,8 @@ chain.addLast("exceutor", new ExecutorFilter()); //业务逻辑处理器,负责处理网络会话及输入输出数据 acceptor.setHandler(new TcpIoHandler(this.adapter)); tcpIoHandler = new TcpIoHandler(this.adapter) ; acceptor.setHandler(tcpIoHandler) ; boolean isException = false ; try { @@ -129,8 +133,26 @@ } } /** * 停止模块运行,将不再接入TCP网络连接,并把已经tcp连接的全部断连接 * @param callback * @throws Exception */ @Override public void stop(UnitStartedCallbackInterface callback) throws Exception { this.tcpIoHandler.stop(); this.dataCodecFactory.stop(); this.adapter.newUnitStopCallback().callback(); callback.call(null); } /** * 解除停止,恢复TCP服务运行 * @throws Exception */ public void recover() throws Exception { this.tcpIoHandler.recover(); this.dataCodecFactory.recover(); } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/tcp/TcpUnitAdapter.java
@@ -22,5 +22,10 @@ * @return */ TcpIoSessionEventCallback newSessionEventCallback() ; /** * 模块停止时的回调 */ TcpUnitStopCallback newUnitStopCallback() ; } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/tcp/TcpUnitStopCallback.java
New file @@ -0,0 +1,10 @@ package com.dy.common.mw.channel.tcp; /** * @Author: liurunyu * @Date: 2024/7/29 9:25 * @Description */ public interface TcpUnitStopCallback { void callback() ; } pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/AdapterImp_TcpUnit.java
@@ -1,9 +1,6 @@ package com.dy.rtuMw.server; import com.dy.common.mw.channel.tcp.PrefixedDataAvailableHandle; import com.dy.common.mw.channel.tcp.TcpConfigVo; import com.dy.common.mw.channel.tcp.TcpIoSessionEventCallback; import com.dy.common.mw.channel.tcp.TcpUnitAdapter; import com.dy.common.mw.channel.tcp.*; import com.dy.common.mw.protocol.PrefixedDataAvailableHandleImp; import com.dy.rtuMw.server.forTcp.TcpIoSessionCallback; @@ -30,4 +27,11 @@ return new TcpIoSessionCallback() ; } /** * 模块停止时的回调 */ @Override public TcpUnitStopCallback newUnitStopCallback() { return new com.dy.rtuMw.server.forTcp.TcpUnitStopCallback() ; } } pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/RtuStatusDealer.java
@@ -13,11 +13,11 @@ public static void onOffLine(RtuSessionStatus sta){ StOnOffLine st = new StOnOffLine() ; st.rtuAddr = sta.rtuAddr ; st.onOff_trueFalse = sta.onTrueOffLine ; st.ip = sta.ip ; st.port = sta.port ; // StOnOffLine st = new StOnOffLine() ; // st.rtuAddr = sta.rtuAddr ; // st.onOff_trueFalse = sta.onTrueOffLine ; // st.ip = sta.ip ; // st.port = sta.port ; // if(member == null){ // log.error("出错,未能得到处理RTU(地址为" + sta.rtuAddr + ")状态数据的数据中间件,可能数据中间件未启动或脱离的集群!"); @@ -28,11 +28,11 @@ } public static void onLine(String rtuAddr, String ip, Integer port){ StOnOffLine st = new StOnOffLine() ; st.rtuAddr = rtuAddr ; st.onOff_trueFalse = true ; st.ip = ip ; st.port = port ; // StOnOffLine st = new StOnOffLine() ; // st.rtuAddr = rtuAddr ; // st.onOff_trueFalse = true ; // st.ip = ip ; // st.port = port ; // Address member = JgUnit.getInstance().getJgMembers().getDataMwMemberByRtuAddr(rtuAddr) ; // if(member == null){ @@ -44,9 +44,9 @@ } public static void offLine(String rtuAddr){ StOnOffLine st = new StOnOffLine() ; st.rtuAddr = rtuAddr ; st.onOff_trueFalse = false ; // StOnOffLine st = new StOnOffLine() ; // st.rtuAddr = rtuAddr ; // st.onOff_trueFalse = false ; // Address member = JgUnit.getInstance().getJgMembers().getDataMwMemberByRtuAddr(rtuAddr) ; // if(member == null){ @@ -58,10 +58,10 @@ } public static void upData(String rtuAddr, Integer bufferLen){ StUpData st = new StUpData() ; st.rtuAddr = rtuAddr ; st.upBufferLen = bufferLen ; st.isReport = false ; // StUpData st = new StUpData() ; // st.rtuAddr = rtuAddr ; // st.upBufferLen = bufferLen ; // st.isReport = false ; // Address member = JgUnit.getInstance().getJgMembers().getDataMwMemberByRtuAddr(rtuAddr) ; // if(member == null){ @@ -73,10 +73,10 @@ } public static void upReport(String rtuAddr, Integer bufferLen){ StUpData st = new StUpData() ; st.rtuAddr = rtuAddr ; st.upBufferLen = bufferLen ; st.isReport = true ; // StUpData st = new StUpData() ; // st.rtuAddr = rtuAddr ; // st.upBufferLen = bufferLen ; // st.isReport = true ; // Address member = JgUnit.getInstance().getJgMembers().getDataMwMemberByRtuAddr(rtuAddr) ; // if(member == null){ @@ -88,9 +88,9 @@ } public static void downData(String rtuAddr, Integer bufferLen){ StDownData st = new StDownData() ; st.rtuAddr = rtuAddr ; st.downBufferLen = bufferLen ; // StDownData st = new StDownData() ; // st.rtuAddr = rtuAddr ; // st.downBufferLen = bufferLen ; // Address member = JgUnit.getInstance().getJgMembers().getDataMwMemberByRtuAddr(rtuAddr) ; // if(member == null){ @@ -102,9 +102,9 @@ } public static void commandSuccess(String rtuAddr){ StCommandResult st = new StCommandResult() ; st.rtuAddr = rtuAddr ; st.isSuccess = true ; // StCommandResult st = new StCommandResult() ; // st.rtuAddr = rtuAddr ; // st.isSuccess = true ; // Address member = JgUnit.getInstance().getJgMembers().getDataMwMemberByRtuAddr(rtuAddr) ; // if(member == null){ @@ -116,9 +116,9 @@ } public static void commandFailure(String rtuAddr){ StCommandResult st = new StCommandResult() ; st.rtuAddr = rtuAddr ; st.isSuccess = false ; // StCommandResult st = new StCommandResult() ; // st.rtuAddr = rtuAddr ; // st.isSuccess = false ; // Address member = JgUnit.getInstance().getJgMembers().getDataMwMemberByRtuAddr(rtuAddr) ; // if(member == null){ @@ -130,9 +130,9 @@ } public static void commandFail2Success(String rtuAddr){ StCommandResult st = new StCommandResult() ; st.rtuAddr = rtuAddr ; st.fail2Success = true ; // StCommandResult st = new StCommandResult() ; // st.rtuAddr = rtuAddr ; // st.fail2Success = true ; // Address member = JgUnit.getInstance().getJgMembers().getDataMwMemberByRtuAddr(rtuAddr) ; // if(member == null){ pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/TcpSessionCache.java
@@ -23,7 +23,20 @@ * 2023-12-19实测,发现Hashtable并不线程安全,所以应用了HashMap和synchronized */ private static HashMap<String, TcpSession> sessionTable = new HashMap<String, TcpSession>() ; /** * 关闭所有网络连接 */ public static void closeAllSessions(){ synchronized (sessionTable){ Collection<TcpSession> col = sessionTable.values() ; for(TcpSession se : col){ se.ioSession.closeNow() ; } sessionTable.clear(); } } /** * 加入新的IoSession * @param rtuAddr pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/TcpUnitStopCallback.java
New file @@ -0,0 +1,13 @@ package com.dy.rtuMw.server.forTcp; /** * @Author: liurunyu * @Date: 2024/7/29 9:26 * @Description */ public class TcpUnitStopCallback implements com.dy.common.mw.channel.tcp.TcpUnitStopCallback{ @Override public void callback() { TcpSessionCache.closeAllSessions(); } } pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
@@ -1,5 +1,7 @@ package com.dy.rtuMw.server.local; import com.dy.common.mw.UnitStartedCallbackInterface; import com.dy.common.mw.channel.tcp.TcpUnit; import com.dy.common.mw.protocol.Command; import com.dy.rtuMw.server.local.localProtocol.*; @@ -23,6 +25,10 @@ return this.onLine(com) ; }else if(code.equals(CodeLocal.allProtocols)){ return this.allProtocols(com) ; }else if(code.equals(CodeLocal.stopTcpSv)){ return this.stopTcpSv(com) ; }else if(code.equals(CodeLocal.recoverTcpSv)){ return this.recoverTcpSv(com) ; } return ReturnCommand.errored("出错,收到内部命令的功能码不能识别!", com.getId(), com.getCode()) ; } @@ -54,4 +60,28 @@ return ReturnCommand.successed("查询所有通信协议配置", command.getId(), command.getCode(), mc) ; } /** * 停止TCP服务,不再接入新的TCP连接,已经TCP连接的全部断连接 * @throws Exception */ private Command stopTcpSv(Command command) throws Exception{ TcpUnit.getInstance().stop(new UnitStartedCallbackInterface(){ public void call(Object obj) throws Exception { } }); return ReturnCommand.successed("已经启动停止TCP服务", command.getId(), command.getCode(), null) ; } /** * 恢复TCP服务,接入新的TCP连接 * @throws Exception */ private Command recoverTcpSv(Command command) throws Exception{ TcpUnit.getInstance().recover(); return ReturnCommand.successed("已经启动恢复TCP服务", command.getId(), command.getCode(), null) ; } } pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
@@ -8,4 +8,7 @@ public static final String allProtocols = "LCD0100" ;//查询所有协议配置 public static final String stopTcpSv = "LCD0110" ;//停止TCP服务,不再接入新的TCP连接,已经TCP连接的全部断连接 public static final String recoverTcpSv = "LCD0112" ;//重启TCP服务,接入新的TCP连接 } pipIrr-platform/pipIrr-web/pipIrr-mwTest-web/src/main/java/com/dy/pipIrrMwTestWeb/common/CodeLocal.java
@@ -8,4 +8,7 @@ public static final String allProtocols = "LCD0100" ;//查询所有协议配置 public static final String stopTcpSv = "LCD0110" ;//停止TCP服务,不再接入新的TCP连接,已经TCP连接的全部断连接 public static final String recoverTcpSv = "LCD0112" ;//恢复TCP服务,接入新的TCP连接 } pipIrr-platform/pipIrr-web/pipIrr-mwTest-web/src/main/java/com/dy/pipIrrMwTestWeb/p206V1_0_0/ComSupportP206V1_0_0.java
@@ -34,7 +34,22 @@ private RestTemplate restTemplate; protected Command command(String code, Object param, String comId){ protected Command commandLocal(String code, Object param, String comId){ Command com = new Command() ; com.id = comId==null?Command.defaultId:(comId.trim().equals("")?Command.defaultId:comId) ;//实际应用中,替换成数据库记录id com.protocol = ProtocolConstantV206V1_0_0.protocolName ; com.code = code ; com.rtuAddr = rtuAddr ; com.type = CommandType.innerCommand ; com.rtuResultSendWebUrl = rtuResultSendWebUrl ; com.param = param ; return com ; } protected Command commandOuter(String code, Object param, String comId){ Command com = new Command() ; com.id = comId==null?Command.defaultId:(comId.trim().equals("")?Command.defaultId:comId) ;//实际应用中,替换成数据库记录id com.protocol = ProtocolConstantV206V1_0_0.protocolName ; @@ -48,6 +63,7 @@ return com ; } /** * 连接通信中间件测试 * @return pipIrr-platform/pipIrr-web/pipIrr-mwTest-web/src/main/java/com/dy/pipIrrMwTestWeb/p206V1_0_0/CommandP206V1_0_0Ctrl.java
@@ -31,7 +31,11 @@ rt = this.connect() ;//连接通信中间件测试 }else{ try{ if(com.equals("10")){ if(com.equals(CodeLocal.stopTcpSv)){ rt = this.stopTcpSv() ; }else if(com.equals(CodeLocal.recoverTcpSv)){ rt = this.recoverTcpSv() ; }else if(com.equals("10")){ rt = this.cd10() ; }else if(com.equals("21")){ rt = this.cd21() ; @@ -96,89 +100,97 @@ return this.sendTest() ; } private BaseResponse stopTcpSv(){ return this.sendCom2Mw(this.commandLocal(CodeLocal.stopTcpSv, null, null)) ; } private BaseResponse recoverTcpSv(){ return this.sendCom2Mw(this.commandLocal(CodeLocal.recoverTcpSv, null, null)) ; } private BaseResponse cd10(){ Com10Vo comVo = new Com10Vo() ; comVo.rtuAddr = "532328059995" ;//前6位是行政区划码,后6位是序列号最大是065535 return this.sendCom2Mw(this.command(CodeV1_0_1.cd_10, comVo, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_10, comVo, null)) ; } private BaseResponse cd21(){ Com21Vo comVo = new Com21Vo() ; comVo.ip = "8.140.180.59" ;//IP(例如 125.235.35.89) comVo.port = 6001 ;///端口号(0~65536) return this.sendCom2Mw(this.command(CodeV1_0_1.cd_21, comVo, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_21, comVo, null)) ; } private BaseResponse cd37(){ Com37Vo comVo = new Com37Vo() ; comVo.seconds = 20 ;//单位秒 return this.sendCom2Mw(this.command(CodeV1_0_1.cd_37, comVo, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_37, comVo, null)) ; } private BaseResponse cd67(){ Com37Vo comVo = new Com37Vo() ; return this.sendCom2Mw(this.command(CodeV1_0_1.cd_67, comVo, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_67, comVo, null)) ; } private BaseResponse cd38(){ Com38Vo comVo = new Com38Vo() ; comVo.remainMoneyAlarm = 10.0 ;//用户余额报警值(大于1.0) return this.sendCom2Mw(this.command(CodeV1_0_1.cd_38, comVo, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_38, comVo, null)) ; } private BaseResponse cd68(){ return this.sendCom2Mw(this.command(CodeV1_0_1.cd_68, null, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_68, null, null)) ; } private BaseResponse cd39(){ Com39Vo comVo = new Com39Vo() ; comVo.batteryVoltAlarm = 7.0 ;//电池电压报警值(大于0.1) return this.sendCom2Mw(this.command(CodeV1_0_1.cd_39, comVo, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_39, comVo, null)) ; } private BaseResponse cd69(){ return this.sendCom2Mw(this.command(CodeV1_0_1.cd_69, null, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_69, null, null)) ; } private BaseResponse cd3A(){ Com3AVo comVo = new Com3AVo() ; comVo.current = 1.0 ;//阀门堵转电流(大于0) return this.sendCom2Mw(this.command(CodeV1_0_1.cd_3A, comVo, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_3A, comVo, null)) ; } private BaseResponse cd6A(){ return this.sendCom2Mw(this.command(CodeV1_0_1.cd_6A, null, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_6A, null, null)) ; } private BaseResponse cd3B(){ Com3BVo comVo = new Com3BVo() ; comVo.second = 120 ;//阀门超时时间(秒)(大于1) return this.sendCom2Mw(this.command(CodeV1_0_1.cd_3B, comVo, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_3B, comVo, null)) ; } private BaseResponse cd6B(){ return this.sendCom2Mw(this.command(CodeV1_0_1.cd_6B, null, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_6B, null, null)) ; } private BaseResponse cd3C(){ Com3CVo comVo = new Com3CVo() ; comVo.minute = 5 ; return this.sendCom2Mw(this.command(CodeV1_0_1.cd_3C, comVo, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_3C, comVo, null)) ; } private BaseResponse cd65(){ return this.sendCom2Mw(this.command(CodeV1_0_1.cd_65, null, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_65, null, null)) ; } private BaseResponse<String> cd91() { return this.sendCom2Mw(this.command(CodeV1_0_1.cd_91, null, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_91, null, null)) ; } private BaseResponse cd92(){ return this.sendCom2Mw(this.command(CodeV1_0_1.cd_92, null, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_92, null, null)) ; } private BaseResponse cd93(){ return this.sendCom2Mw(this.command(CodeV1_0_1.cd_93, null, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_93, null, null)) ; } //APP远程开阀 @@ -187,13 +199,13 @@ comVo.icCardNo = CommandP206V1_0_0Ctrl.vsIcCardNo; comVo.moneyRemain = 234.56 ; comVo.waterPrice = 1.2 ; return this.sendCom2Mw(this.command(CodeV1_0_1.cd_97, comVo, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_97, comVo, null)) ; } //APP远程关阀 private BaseResponse cd98(){ Com98Vo comVo = new Com98Vo() ; comVo.icCardNo = CommandP206V1_0_0Ctrl.vsIcCardNo; return this.sendCom2Mw(this.command(CodeV1_0_1.cd_98, comVo, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_98, comVo, null)) ; } //定时关阀开阀 @@ -203,7 +215,7 @@ comVo.moneyRemain = 234.56 ; comVo.waterPrice = 1.2 ; comVo.minutes = 3 ; return this.sendCom2Mw(this.command(CodeV1_0_1.cd_99, comVo, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_99, comVo, null)) ; } //定量关阀开阀 @@ -213,7 +225,7 @@ comVo.moneyRemain = 234.56 ; comVo.waterPrice = 1.2 ; comVo.waterAmount = 10 ; return this.sendCom2Mw(this.command(CodeV1_0_1.cd_A0, comVo, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_A0, comVo, null)) ; } @@ -231,7 +243,7 @@ comVo.day = nextDt[2] ; comVo.hour = nextDt[3] ; comVo.minute = nextDt[4] ; return this.sendCom2Mw(this.command(CodeV1_0_1.cd_A1, comVo, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_A1, comVo, null)) ; } //定量关阀计划开阀 @@ -248,12 +260,12 @@ comVo.day = nextDt[2] ; comVo.hour = nextDt[3] ; comVo.minute = nextDt[4] ; return this.sendCom2Mw(this.command(CodeV1_0_1.cd_A2, comVo, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_A2, comVo, null)) ; } private BaseResponse cdB0(){ return this.sendCom2Mw(this.command(CodeV1_0_1.cd_B0, null, null)) ; return this.sendCom2Mw(this.commandOuter(CodeV1_0_1.cd_B0, null, null)) ; } //在线情况