pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/multiDataSource/DataSourceAspect.java
@@ -14,7 +14,12 @@ import java.lang.reflect.Method; import java.util.Objects; /** * 多数据源应用中,通过注解确定某个数据源 * 例如在Services层方法上注解: * @DataSource("test") * 表示应用数据源test */ @Slf4j @Aspect @Order(Constant.AspectOrderDataSource) @@ -32,7 +37,7 @@ DataSource dataSource = method.getAnnotation(DataSource.class); if (Objects.nonNull(dataSource) && !StringUtils.isNullOrEmpty(dataSource.value())) { log.info("切换数据源为" + dataSource.value()); //log.info("切换数据源为" + dataSource.value()); //强制转成方法上配置的数据源,替换掉DataSourceContext中保存的数据源 DataSourceContext.set(dataSource.value()); }else{ pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/multiDataSource/DataSourceContext.java
@@ -1,5 +1,8 @@ package com.dy.common.multiDataSource; /** * 线程安全的数据源持有者,持有当前访问所应用的数据源名称 */ public class DataSourceContext { private final static ThreadLocal<String> LOCAL_DATASOURCE = new ThreadLocal<>(); pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/multiDataSource/DataSourceSingle.java
@@ -5,7 +5,7 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 针对一个数据源(中间件应用),多数据源应用中实际只用了一个数据源 * 针对一个数据源(通信中间件应用),多数据源应用中实际只用了一个数据源 * 多数据源应用中,通过注解确定某个数据源。 * 确定数据源有两种方法: * 1、注解明确了数据源: pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/multiDataSource/DataSourceSingleAspect.java
@@ -33,24 +33,24 @@ DataSourceSingle dataSource = method.getAnnotation(DataSourceSingle.class); if (Objects.nonNull(dataSource) && !StringUtils.isNullOrEmpty(dataSource.value())) { log.info("数据源指定为" + dataSource.value()); //log.info("数据源指定为" + dataSource.value()); //强制转成方法上配置的数据源,替换掉DataSourceContext中保存的数据源 DataSourceContext.set(dataSource.value()); }else{ String datasourceName = SpringContextUtil.getApplicationContext().getEnvironment().getProperty("spring.datasource.names") ; if(!StringUtils.isNullOrEmpty(datasourceName)){ log.info("根据配置数据源为" + datasourceName); //log.info("根据配置数据源为" + datasourceName); DataSourceContext.set(datasourceName); }else{ log.error("数据源未指定"); } } try { log.info("数据库操作开始" + dataSource.value()); //log.info("数据库操作开始" + dataSource.value()); return point.proceed(); } finally { // 销毁数据源 在执行方法之后 log.info("数据源操作完毕" + dataSource.value()); //log.info("数据源操作完毕" + dataSource.value()); DataSourceContext.remove(); } } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/core/CoreConstantThread.java
@@ -46,7 +46,7 @@ List<CoreTask> constantTasks = CoreUnit.getAllConstantTasks(); if (constantTasks != null && constantTasks.size() > 0) { for (CoreTask task : constantTasks) { temp = task.excute(); temp = task.execute(); if (temp != null) { count += temp; } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/core/CoreTask.java
@@ -8,5 +8,5 @@ * 任务自我执行方法 * @return 执行任务的数量,如果调用者不需要数量时,可以返回null */ public abstract Integer excute() ; public abstract Integer execute() ; } pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/core/CoreThread.java
@@ -22,8 +22,8 @@ /** * 设置暂停时长 * @param sleepWorking * @param sleepIdel * @param sleepBigBusy 大忙时睡眠时长 * @param sleepSmallBusy 小忙时睡眠时长 */ public void setSleep(Long sleepBigBusy, Long sleepSmallBusy){ CoreThread.sleepBigBusy = sleepBigBusy ; @@ -52,7 +52,7 @@ while(n < count){ CoreTask task = (CoreTask)coreQueue.pop() ; if(task != null){ task.excute(); task.execute(); } n ++ ; } pipIrr-platform/pipIrr-mw/pipIrr-mw-accept/src/main/java/com/dy/aceMw/server/forTcp/TcpDownCommandObj.java
@@ -50,8 +50,8 @@ return removeNodeFromCach ; } TcpSession tcpSe = TcpSessionCach.getTcpSession(this.result.rtuAddr) ; Boolean flag = TcpSessionCach.isConnect(this.result.rtuAddr) ; TcpSession tcpSe = TcpSessionCache.getTcpSession(this.result.rtuAddr) ; Boolean flag = TcpSessionCache.isConnect(this.result.rtuAddr) ; if(tcpSe == null || flag == null || !flag.booleanValue()){ //未曾上线或不在线 if(!this.result.isCachForOffLine){ pipIrr-platform/pipIrr-mw/pipIrr-mw-accept/src/main/java/com/dy/aceMw/server/forTcp/TcpSessionCache.java
New file @@ -0,0 +1,216 @@ package com.dy.aceMw.server.forTcp; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.Hashtable; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import org.apache.mina.core.session.IoSession; import com.dy.aceMw.server.ServerProperties; public class TcpSessionCache { /** * 用Hashtable而不用HashMap原因: * Hashtable线程安全的 * HashMap线程不安全的 * 多线程对sessionTable读出或存入,可能产生异常 * TcpSessionCache是在多线程环境下运行 * * 2023-12-19实测,发现Hashtable并不线程安全,所以应用了HashMap和synchronized */ private static HashMap<String, TcpSession> sessionTable = new HashMap<String, TcpSession>() ; /** * 加入新的IoSession * @param rtuAddr * @param protocolName * @param ioSession */ public static void putNewTcpSession(String rtuAddr, String protocolName, IoSession ioSession){ synchronized (sessionTable){ TcpSession tcpSe = sessionTable.get(rtuAddr) ; if(tcpSe == null){ tcpSe = new TcpSession() ; tcpSe.protocolName = protocolName ; tcpSe.ioSession = ioSession ; sessionTable.put(rtuAddr, tcpSe) ; }else{ tcpSe.ioSession = ioSession ; } } } /** * 更新IoSession对应的rtuAddr * @param oldRtuAddr * @param newRtuAddr * @param protocolName * @param ioSession */ public static void changeRtuAddr(String oldRtuAddr, String newRtuAddr, String protocolName, IoSession ioSession){ if(oldRtuAddr != null && newRtuAddr != null && !oldRtuAddr.equals(newRtuAddr)){ synchronized (sessionTable){ TcpSession tcpSe = sessionTable.get(oldRtuAddr) ; if(tcpSe == null){ putNewTcpSession(newRtuAddr, protocolName, ioSession) ; }else{ sessionTable.remove(oldRtuAddr) ; sessionTable.put(newRtuAddr, tcpSe) ; } } } } /** * 得到TcpSession * @param rtuAddr * @return */ public static TcpSession getTcpSession(String rtuAddr){ return sessionTable.get(rtuAddr) ; } /** * 得到Tcp通信协议名称 * @param rtuAddr * @return */ public static String getTcpProtocolName(String rtuAddr){ TcpSession tcpSe = sessionTable.get(rtuAddr) ; if(tcpSe != null){ return tcpSe.protocolName ; }else{ return null ; } } /** * 得到所有在线情况 * @return */ public static HashMap<String, Boolean> allOnLine(){ synchronized (sessionTable){ HashMap<String, Boolean> map = new HashMap<String, Boolean>(); Iterator<Entry<String, TcpSession>> it = sessionTable.entrySet().iterator() ; Entry<String, TcpSession> entry = null ; while(it.hasNext()){ entry = it.next() ; map.put(entry.getKey(), entry.getValue().ioSession.isConnected()) ; } return map ; } } /** * 得到所有RTU连接状态情况 * @return */ public static List<RtuSessionStatus> allConnectStatus(){ synchronized (sessionTable){ List<RtuSessionStatus> list = new ArrayList<RtuSessionStatus>(); Iterator<Entry<String, TcpSession>> it = sessionTable.entrySet().iterator() ; Entry<String, TcpSession> entry = null ; while(it.hasNext()){ entry = it.next() ; RtuSessionStatus vo = new RtuSessionStatus() ; vo.rtuAddr = entry.getKey() ; IoSession se = entry.getValue().ioSession ; vo.onTrueOffLine = se.isConnected() ; InetSocketAddress sa = (InetSocketAddress)se.getRemoteAddress() ; if(sa != null){ InetAddress inetAddr = sa.getAddress() ; if(inetAddr != null){ vo.ip = inetAddr.getHostAddress() ; vo.port = sa.getPort() ; } } list.add(vo) ; } return list ; } } /** * 得到IoSession * @param rtuAddr * @return */ // public IoSession getIoSession(String rtuAddr){ // TcpSession tcpSe = sessionMap.get(rtuAddr) ; // if(tcpSe != null){ // return tcpSe.ioSession ; // } // return null ; // } /** * 网络是否连接 * @param rtuAddr * @return */ public static Boolean isConnect(String rtuAddr){ TcpSession tcpSe = sessionTable.get(rtuAddr) ; if(tcpSe != null){ return tcpSe.ioSession.isConnected() ; } return null ; } /** * 通过IoSession输出数据 * @param rtuAddr * @param data * @throws Exception */ public static void write(String rtuAddr, byte[] data) throws Exception{ TcpSession tcpSe = sessionTable.get(rtuAddr) ; if(tcpSe != null){ if(tcpSe.ioSession.isConnected()){ tcpSe.ioSession.write(data) ; }else{ throw new Exception("Rtu连接已经关闭!") ; } }else{ throw new Exception("Rtu未曾上线!") ; } } /** * 设置上行数据时刻 * @param rtuAddr */ public static void cacheUpDataTime(String rtuAddr){ TcpSession tcpSe = sessionTable.get(rtuAddr) ; if(tcpSe != null){ tcpSe.lastUpDataTime = System.currentTimeMillis() ; } } /** * 更新上行数据时刻 * 当上行数据时刻已经过去一定时长,上行数据时刻清空 */ public static void updateUpDataTime(Long now){ synchronized (sessionTable){ Iterator<TcpSession> it = sessionTable.values().iterator() ; TcpSession tcpSe ; while(it.hasNext()){ tcpSe = it.next() ; if(tcpSe.lastUpDataTime != null){ if(now - tcpSe.lastUpDataTime > ServerProperties.lastUpDataTimeLive){ tcpSe.lastUpDataTime = null ; } } } } } } pipIrr-platform/pipIrr-mw/pipIrr-mw-accept/src/main/java/com/dy/aceMw/server/forTcp/TcpSessionCacheBk.java
File was renamed from pipIrr-platform/pipIrr-mw/pipIrr-mw-accept/src/main/java/com/dy/aceMw/server/forTcp/TcpSessionCach.java @@ -1,27 +1,24 @@ package com.dy.aceMw.server.forTcp; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.Hashtable; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import com.dy.aceMw.server.ServerProperties; import org.apache.mina.core.session.IoSession; import com.dy.aceMw.server.ServerProperties; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.*; import java.util.Map.Entry; public class TcpSessionCach { /** * 用Hashtable实现,但实测发现Hashtable并不线程安全 */ public class TcpSessionCacheBk { /** * 用Hashtable而不用HashMap原因: * Hashtable线程安全的 * HashMap线程不安全的 * 多线程对sessionTable读出或存入,可能产生异常 * 虽然是单个主线程,但在Jgroups web成员查询在线情况时,Jgroups的线程直接侵入,从而形成多线程环境,多线程遍历时,Map成员数量有变化就会产生异常 * TcpSessionCache是在多线程环境下运行 */ private static Hashtable<String, TcpSession> sessionTable = new Hashtable<String, TcpSession>() ; @@ -51,12 +48,14 @@ * @param ioSession */ public static void changeRtuAddr(String oldRtuAddr, String newRtuAddr, String protocolName, IoSession ioSession){ TcpSession tcpSe = sessionTable.get(oldRtuAddr) ; if(tcpSe == null){ putNewTcpSession(newRtuAddr, protocolName, ioSession) ; }else{ sessionTable.remove(oldRtuAddr) ; sessionTable.put(newRtuAddr, tcpSe) ; if(oldRtuAddr != null && newRtuAddr != null && !oldRtuAddr.equals(newRtuAddr)){ TcpSession tcpSe = sessionTable.get(oldRtuAddr) ; if(tcpSe == null){ putNewTcpSession(newRtuAddr, protocolName, ioSession) ; }else{ sessionTable.remove(oldRtuAddr) ; sessionTable.put(newRtuAddr, tcpSe) ; } } } @@ -101,7 +100,7 @@ * 得到所有RTU连接状态情况 * @return */ public static List<RtuSessionStatus> allConnectStauts(){ public static List<RtuSessionStatus> allConnectStatus(){ List<RtuSessionStatus> list = new ArrayList<RtuSessionStatus>(); Iterator<Entry<String, TcpSession>> it = sessionTable.entrySet().iterator() ; Entry<String, TcpSession> entry = null ; @@ -174,7 +173,7 @@ * 设置上行数据时刻 * @param rtuAddr */ public static void cachUpDataTime(String rtuAddr){ public static void cacheUpDataTime(String rtuAddr){ TcpSession tcpSe = sessionTable.get(rtuAddr) ; if(tcpSe != null){ tcpSe.lastUpDataTime = System.currentTimeMillis() ; pipIrr-platform/pipIrr-mw/pipIrr-mw-accept/src/main/java/com/dy/aceMw/server/rtuData/p206V1_0_0/TkPreGenObjs.java
@@ -22,7 +22,7 @@ public void execute(Object data) { Data d = (Data)data ; String rtuAddr = d.getRtuAddr() ; log.info("RTU" + rtuAddr + "数据到此,还未实现处理:" + data.toString()); log.info("RTU" + rtuAddr + "数据到此,进行数据库存储测试"); RtuSv sv = (RtuSv)SpringContextUtil.getBean(RtuSv.class) ; sv.save(d) ; pipIrr-platform/pipIrr-mw/pipIrr-mw-accept/src/main/java/com/dy/aceMw/server/tasks/FromRtuConstantTask.java
@@ -17,24 +17,63 @@ * 在单线程环境中运行 */ @Override public Integer excute() { public Integer execute() { try{ return dealRtuUpData() ; dealRtuUpData() ; }catch(Exception e){ log.error(e); } return null ; return RtuDataCache.size()>0?0:1 ; } /** * 处理上行数据 */ public Integer dealRtuUpData() { public void dealRtuUpData() { Node first = RtuDataCache.getFirstQueueNode() ; if(first != null){ Node last = RtuDataCache.getLastQueueNode() ; while (last != null){ last = this.doDealRtuUpData(first, last); } } } /** * 处理缓存的上行数据节点 * @param first 第一个节点 * @param last 最后一个节点 */ private Node doDealRtuUpData(Node first, Node last){ if(last != null){ if(first != last){ //在dealNode方法中,可能要把last从队列中移除,这时last.pre为空,所以提前把last.pre取出来 Node pre = last.pre ; dealNode(last) ; return pre ; }else{ //停止 return null ; } }else{ return null ; } } //////////////////////////////////////////////// // //以下实现,采用了递归调用,当队列缓存结点很多时,会产生栈溢出异常 // //////////////////////////////////////////////// /** * 处理上行数据 */ public Integer dealRtuUpData_() { Node first = RtuDataCache.getFirstQueueNode() ; if(first != null){ Integer count = RtuDataCache.size() ; Node last = RtuDataCache.getLastQueueNode() ; this.doDealRtuUpData(first, last); this.doDealRtuUpData_(first, last); return count ; } return null ; @@ -45,18 +84,15 @@ * @param first 第一个节点 * @param last 最后一个节点 */ private void doDealRtuUpData(Node first, Node last){ private void doDealRtuUpData_(Node first, Node last){ if(last != null){ //在dealNode方法中,可能要把last从队列中移除,这时last.pre为空,所以提前把last.pre取出来 Node pre = last.pre ; dealNode(last) ; if(first != null && first != last){ if(first != last){ doDealRtuUpData(first, pre) ; }else if(first != null && first == last){ }else{ //停止 }else if(first == null){ //这种情况不会存在 doDealRtuUpData(null, pre) ; } } } pipIrr-platform/pipIrr-mw/pipIrr-mw-accept/src/main/java/com/dy/aceMw/server/tasks/RtuDownTask.java
@@ -9,7 +9,7 @@ import com.dy.common.mw.protocol.Driver; import com.dy.common.mw.protocol.ProtocolCache; import com.dy.aceMw.server.ServerProperties; import com.dy.aceMw.server.forTcp.TcpSessionCach; import com.dy.aceMw.server.forTcp.TcpSessionCache; /** * 从web业务系统发向RTU的命令任务 @@ -21,7 +21,7 @@ private static Logger log = LogManager.getLogger(RtuDownTask.class.getName()); @Override public Integer excute() { public Integer execute() { Command com = (Command)this.data ; try { log.info("下发远程命令" + com.getCode() + "的核心任务开始执行"); @@ -41,7 +41,7 @@ String rtuAddr = com.getRtuAddr() ; //前面已经判断rtuAddr为空情况,至此其不为空 Driver dri = null ; String protocolName = TcpSessionCach.getTcpProtocolName(rtuAddr) ; String protocolName = TcpSessionCache.getTcpProtocolName(rtuAddr) ; if(protocolName == null){ //RTU未曾上线 int count = ProtocolCache.driverCount() ; pipIrr-platform/pipIrr-mw/pipIrr-mw-accept/src/main/java/com/dy/aceMw/server/tasks/RtuUpTask.java
@@ -18,7 +18,7 @@ import com.dy.aceMw.server.ServerProperties; import com.dy.aceMw.server.forTcp.RtuLogDealer; import com.dy.aceMw.server.forTcp.RtuStatusDealer; import com.dy.aceMw.server.forTcp.TcpSessionCach; import com.dy.aceMw.server.forTcp.TcpSessionCache; import com.dy.common.util.ByteUtil; public class RtuUpTask extends CoreTask { @@ -26,7 +26,7 @@ private static final Logger log = LogManager.getLogger(RtuUpTask.class.getName()); @Override public Integer excute() { public Integer execute() { Object[] os = (Object[])this.data ; IoSession session = (IoSession)os[0] ; byte[] upBuf = (byte[])os[1] ; @@ -72,7 +72,7 @@ //设置session的属性ID TcpUnit.getInstance().setIoSessionArrId(session, rtuAddr); //缓存session TcpSessionCach.putNewTcpSession(rtuAddr, protocolName, session); TcpSessionCache.putNewTcpSession(rtuAddr, protocolName, session); log.info("RTU(地址:" + rtuAddr + ")上线了。") ; } @@ -81,11 +81,11 @@ if(rtuAddr != null){ if(protocolName == null){ protocolName = TcpSessionCach.getTcpProtocolName(rtuAddr) ; protocolName = TcpSessionCache.getTcpProtocolName(rtuAddr) ; } //设置收到数据时刻 TcpSessionCach.cachUpDataTime(rtuAddr); TcpSessionCache.cacheUpDataTime(rtuAddr); if(protocolName != null){ //对上行数据进行处理 @@ -154,7 +154,7 @@ //更新终端状态 if(rtuAddrInData != null && !rtuAddrInData.equals(rtuAddrAtHead)){ //数据头中的RTU地址与数据中的RTU地址不一致,更换成数据中的RTU地址 TcpSessionCach.changeRtuAddr(rtuAddrAtHead, rtuAddrInData, protocolName, session); TcpSessionCache.changeRtuAddr(rtuAddrAtHead, rtuAddrInData, protocolName, session); session.setAttribute(TcpIoSessionAttrIdIsRtuAddr.sessionArrIdKey, rtuAddrInData) ; } pipIrr-platform/pipIrr-mw/pipIrr-mw-accept/src/main/java/com/dy/aceMw/server/tasks/ToRtuConstantTask.java
@@ -7,7 +7,7 @@ import com.dy.common.mw.core.CoreTask; import com.dy.aceMw.server.forTcp.TcpDownCommandCache; import com.dy.aceMw.server.forTcp.TcpDownCommandObj; import com.dy.aceMw.server.forTcp.TcpSessionCach; import com.dy.aceMw.server.forTcp.TcpSessionCache; /** * 处理RTU下行命令数据的恒久任务 @@ -19,26 +19,70 @@ * 在单线程环境中运行 */ @Override public Integer excute() { public Integer execute() { try{ Long now = System.currentTimeMillis() ; dealTcpSession(now) ; return dealDownCommand(now) ; dealTcpSession() ; }catch(Exception e){ log.error("更新RTU会话上报数据时刻时发生集合操作异常,此异常并不影响系统正常运行", e); } try{ dealDownCom() ; }catch(Exception e){ log.error(e); } return null ; return TcpDownCommandCache.size()>0?0:1 ; } /** * 处理TCP缓存中的各个TCP Session的上行数据时刻 */ private void dealTcpSession(Long now){ TcpSessionCach.updateUpDataTime(now) ; private void dealTcpSession(){ TcpSessionCache.updateUpDataTime(System.currentTimeMillis() ) ; } /** * 处理下行命令 */ public void dealDownCom() { Node first = TcpDownCommandCache.getFirstQueueNode() ; if(first != null){ Node last = TcpDownCommandCache.getLastQueueNode() ; while (last != null){ last = this.doDealDownComm(System.currentTimeMillis(), first, last); } } } /** * 处理缓存的下行命令节点 * @param now 当前时刻 * @param first 第一个节点 * @param last 最后一个节点 */ private Node doDealDownComm(Long now, Node first, Node last){ if(last != null){ if(first != last){ //在dealNode方法中,可能要把last从队列中移除,这时last.pre为空,所以提前把last.pre取出来 Node pre = last.pre ; dealNode(now, last) ; return pre ; }else{ //停止 return null ; } }else{ return null ; } } //////////////////////////////////////////////// // //以下实现,采用了递归调用,当队列缓存结点很多时,会产生栈溢出异常 // //////////////////////////////////////////////// /** * 处理下行命令 public Integer dealDownCommand(Long now) { Node first = TcpDownCommandCache.getFirstQueueNode() ; if(first != null){ @@ -49,12 +93,12 @@ } return null ; } */ /** * 处理缓存的下行命令节点 * @param now 当前时刻 * @param first 第一个节点 * @param last 最后一个节点 */ private void doDealDownCommand1(Long now, Node first, Node last){ if(first != null){ //在dealNode方法中,可能要把first从队列中移除,这时first.next为空,所以提前把first.next取出来 @@ -70,13 +114,12 @@ } } } */ /** * 处理缓存的下行命令节点 * @param now 当前时刻 * @param first 第一个节点 * @param last 最后一个节点 */ private void doDealDownCommand(Long now, Node first, Node last){ if(last != null){ //在dealNode方法中,可能要把last从队列中移除,这时last.pre为空,所以提前把last.pre取出来 @@ -92,7 +135,7 @@ } } } */ /** * 处理一个节点 * @param now 现在时刻 pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/ServerProperties.java
@@ -17,6 +17,8 @@ //发送数据次数 public static Integer sendTimes = 0 ; //mwTestServer控制是否启动 public static boolean startWork = false ; public static boolean startTcpConnectWork = false ; //mwTestServer public static boolean startRtuReportWork = false ; } pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/rmiClient/Code.java
@@ -6,12 +6,14 @@ public static String cd2 = "002" ;//请求sepTest的配置 public static String cd3 = "003" ;//请求开始工作 public static String cd4 = "004" ;//向服务端上报信息 public static String cd3 = "003" ;//请求开始建立网络连接 public static String cd5 = "005" ;//向服务端上报 完成任务的数量 public static String cd4 = "004" ;//请求开始RTU上报数据 public static String cd6 = "006" ;//向服务端上报 全部任务完成 public static String cd5 = "005" ;//向服务端上报信息 public static String cd6 = "006" ;//向服务端上报 完成任务的数量 public static String cd7 = "007" ;//向服务端上报 全部任务完成 } pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/rmiClient/ResStartRtuReportVo.javacopy from pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/rmiClient/ResStartVo.java copy to pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/rmiClient/ResStartRtuReportVo.java
File was copied from pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/rmiClient/ResStartVo.java @@ -3,9 +3,9 @@ import com.alibaba.fastjson2.JSON; public class ResStartVo { public class ResStartRtuReportVo { public boolean start ; public boolean report ; /** @@ -27,21 +27,20 @@ * @return 对象 * @throws Exception 异常 */ public static ResStartVo jsonToObject(String json)throws Exception{ public static ResStartRtuReportVo jsonToObject(String json)throws Exception{ try{ return JSON.parseObject(json, ResStartVo.class) ; return JSON.parseObject(json, ResStartRtuReportVo.class) ; //return new JSONDeserializer<ResStartVo>().deserialize(json, ResStartVo.class) ; }catch(Exception e){ throw new Exception(e.getMessage() , e ) ; } } public boolean isStart() { return start; public boolean isReport() { return report; } public void setStart(boolean start) { this.start = start; public void setReport(boolean report) { this.report = report; } } pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/rmiClient/ResStartTcpConnectVo.java
File was renamed from pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/rmiClient/ResStartVo.java @@ -3,7 +3,7 @@ import com.alibaba.fastjson2.JSON; public class ResStartVo { public class ResStartTcpConnectVo { public boolean start ; @@ -27,9 +27,9 @@ * @return 对象 * @throws Exception 异常 */ public static ResStartVo jsonToObject(String json)throws Exception{ public static ResStartTcpConnectVo jsonToObject(String json)throws Exception{ try{ return JSON.parseObject(json, ResStartVo.class) ; return JSON.parseObject(json, ResStartTcpConnectVo.class) ; //return new JSONDeserializer<ResStartVo>().deserialize(json, ResStartVo.class) ; }catch(Exception e){ throw new Exception(e.getMessage() , e ) ; pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/rmiClient/RmiClUnit.java
@@ -86,7 +86,7 @@ RmiRequestVo rqVo = new RmiRequestVo() ; rqVo.id = id ; rqVo.token = token ; rqVo.code = Code.cd4 ; rqVo.code = Code.cd5; rqVo.count = count ; String json = rqVo.toJson() ; frmWork.syncRequest(json) ; @@ -102,7 +102,7 @@ RmiRequestVo rqVo = new RmiRequestVo() ; rqVo.id = id ; rqVo.token = token ; rqVo.code = Code.cd5 ; rqVo.code = Code.cd6; rqVo.overCount = count ; String json = rqVo.toJson() ; frmWork.syncRequest(json) ; @@ -119,7 +119,7 @@ RmiRequestVo rqVo = new RmiRequestVo() ; rqVo.id = id ; rqVo.token = token ; rqVo.code = Code.cd6 ; rqVo.code = Code.cd7; rqVo.over = true ; rqVo.seconds = seconds ; String json = rqVo.toJson() ; @@ -238,13 +238,14 @@ } } if(!error){ getStart(frmWork) ; getStartTcpConnect(frmWork) ; } } //从mwTestServer得到开始上报数据的请允许 private void getStart(RmiFrameWork frmWork){ log.info("等待服务端允许上报数据"); //从mwTestServer得到开始TCP连接的请允许 private void getStartTcpConnect(RmiFrameWork frmWork){ log.info("等待服务端允许网络连接"); boolean error = false ; while(true){ try { Thread.sleep(100L); @@ -255,37 +256,93 @@ String json = rqVo.toJson() ; Object rObj = frmWork.syncRequest(json) ; if(rObj != null){ RmiResponseVo rspVo = RmiResponseVo.jsonToObject(String.valueOf(rObj), ResStartVo.class) ; RmiResponseVo rspVo = RmiResponseVo.jsonToObject(String.valueOf(rObj), ResStartTcpConnectVo.class) ; if(rspVo != null){ if(rspVo.success){ if(rspVo.obj != null && rspVo.obj instanceof ResStartVo){ ResStartVo rVo = (ResStartVo)rspVo.obj ; if(rspVo.obj != null && rspVo.obj instanceof ResStartTcpConnectVo){ ResStartTcpConnectVo rVo = (ResStartTcpConnectVo)rspVo.obj ; if(rVo != null){ if(rVo.start){ ServerProperties.startWork = true ; log.info("允许上报数据工作了( ^_^ )"); ServerProperties.startTcpConnectWork = true ; log.info("允许TCP网络连接了( ^_^ )"); error = false ; break ; } }else{ log.error("rmi请求启动失败:json转ResStartVo为null"); error = true ; log.error("rmi请求TCP网络连接失败:json转ResStartTcpConnectVo为null"); } }else{ log.error("rmi请求启动失败:服务端返回ResStartVo为null"); error = true ; log.error("rmi请求TCP网络连接失败:服务端返回ResStartTcpConnectVo为null"); } }else{ log.error("rmi请求启动失败:服务端返回错误:" + rspVo.errorInfo); error = true ; log.error("rmi请求TCP网络连接失败:服务端返回错误:" + rspVo.errorInfo); } }else{ log.error("rmi请求启动失败:服务端返回的RmiResponseVo为null"); error = true ; log.error("rmi请求TCP网络连接失败:服务端返回的RmiResponseVo为null"); } }else{ log.error("rmi请求启动失败:服务端返回json为null"); error = true ; log.error("rmi请求TCP网络连接失败:服务端返回json为null"); } } catch (Exception e) { log.error("rmi请求启动失败" + e.getMessage()); error = true ; log.error("rmi请求TCP网络连接失败" + e.getMessage()); continue ; } } if(!error){ getStartRtuReport(frmWork) ; } } //从mwTestServer得到开始RTU上报数据的请允许 private void getStartRtuReport(RmiFrameWork frmWork){ log.info("等待服务端允许上报数据"); while(true){ try { Thread.sleep(100L); RmiRequestVo rqVo = new RmiRequestVo() ; rqVo.id = id ; rqVo.token = token ; rqVo.code = Code.cd4 ; String json = rqVo.toJson() ; Object rObj = frmWork.syncRequest(json) ; if(rObj != null){ RmiResponseVo rspVo = RmiResponseVo.jsonToObject(String.valueOf(rObj), ResStartRtuReportVo.class) ; if(rspVo != null){ if(rspVo.success){ if(rspVo.obj != null && rspVo.obj instanceof ResStartRtuReportVo){ ResStartRtuReportVo rVo = (ResStartRtuReportVo)rspVo.obj ; if(rVo != null){ if(rVo.report){ ServerProperties.startRtuReportWork = true ; log.info("允许RTU上报数据工作了( ^_^ )"); break ; } }else{ log.error("rmi请求Rtu上报数据失败:json转ResStartRtuReportVo为null"); } }else{ log.error("rmi请求Rtu上报数据失败:服务端返回ResStartRtuReportVo为null"); } }else{ log.error("rmi请求Rtu上报数据失败:服务端返回错误:" + rspVo.errorInfo); } }else{ log.error("rmi请求Rtu上报数据失败:服务端返回的RmiResponseVo为null"); } }else{ log.error("rmi请求Rtu上报数据失败:服务端返回json为null"); } } catch (Exception e) { log.error("rmi请求Rtu上报数据失败" + e.getMessage()); continue ; } } } } pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/MyThreadJob.java
@@ -23,9 +23,10 @@ public static final int connectTimeout = 3000 ; public int sendTimes = 0 ;//发送数据次数 public int heartbeatTimes = 0 ;//上报心跳次数 public int sendTimes = 1 ;//发送数据次数 public int heartbeatTimes = 1 ;//上报心跳次数 public long overStart = 0L; public boolean isOver = false ; public MyThreadJob(){ @@ -38,21 +39,29 @@ @Override public void execute() throws Exception { log.info("RTU" + rtuAddr + "开始任务"); if(session != null){ log.info("RTU" + rtuAddr + "将要执行" + ServerProperties.sendTimes + "轮次任务,当前轮次是" + sendTimes); if(sendTimes <= ServerProperties.sendTimes){ log.info("RTU" + rtuAddr + "开始任务"); log.info("RTU" + rtuAddr + "将要执行" + ServerProperties.sendTimes + "轮次任务,当前轮次是" + sendTimes); sendDataOfP206V1_0_0() ; }else{ this.jobOver() ; log.info("RTU" + rtuAddr + "等待一会,以接收通信中间件下行数据"); if(overStart == 0){ overStart = System.currentTimeMillis() ; }else{ long now = System.currentTimeMillis() ; if(now - overStart >= 30 * 1000){ this.jobOver() ; } } } } } private void sendDataOfP206V1_0_0(){ try{ if(heartbeatTimes >= ServerProperties.heartbeatTimes){ heartbeatTimes = 0 ; if(heartbeatTimes > ServerProperties.heartbeatTimes){ heartbeatTimes = 1 ; this.sendReportData() ; TcpClUnit.clientSendData(); sendTimes++ ; pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java
@@ -71,7 +71,7 @@ public void run() { try { while(true){ if(!ServerProperties.startWork){ if(!ServerProperties.startTcpConnectWork){ Thread.sleep(100L); }else{ try{ @@ -101,7 +101,14 @@ } } startJob() ; while (true){ if(!ServerProperties.startRtuReportWork){ Thread.sleep(100L); }else{ startJob() ; break ; } } while(true){ if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){ @@ -210,16 +217,16 @@ totalSendDataCount++; if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){ RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount); System.out.println("已经发送" + totalSendDataCount + "条数据"); System.out.println("已经发送" + totalSendDataCount + "条数据(心跳和上报)"); }else{ if(totalRtuClientCount > 100){ if(totalSendDataCount % 100 == 0){ RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount); System.out.println("已经发送" + totalSendDataCount + "条数据"); System.out.println("已经发送" + totalSendDataCount + "条数据(心跳和上报)"); } }else{ RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount); System.out.println("已经发送" + totalSendDataCount + "条数据"); System.out.println("已经发送" + totalSendDataCount + "条数据(心跳和上报)"); } } } pipIrr-platform/pipIrr-mw/pipIrr-mwTest-server/src/main/java/com/dy/testServer/console/Command.java
@@ -20,7 +20,8 @@ commands = new String[]{ "config 查看配置信息", "show 显示mwTest情况", "start 启动mwTest上报数据", "start 启动mwTest建立TCP连接数据", "report 启动mwTest RTU上报数据", "exit 退出", }; } @@ -35,7 +36,9 @@ } else if (command.equals("show")) { show(prtWrt); } else if (command.equals("start")) { start(prtWrt); startNetConnect(prtWrt); } else if (command.equals("report")) { startRtuReport(prtWrt); } else if(command.equals("exit")){ exit = true ; } else { @@ -101,9 +104,14 @@ prtWrt.println(" rtuAddr范围:" + sta.confVo.rtuAddrStart + "--" + sta.confVo.rtuAddrEnd); } if(sta.startVo != null){ prtWrt.println(" 上报数据:" + (sta.startVo.start?"已经开始":"未开始")); prtWrt.println(" TCP连接:" + (sta.startVo.start?"已经开始":"未开始")); }else{ prtWrt.println(" 上报数据:未开始") ; prtWrt.println(" TCP连接:未开始") ; } if(sta.reportVo != null){ prtWrt.println(" RTU上报数据:" + (sta.reportVo.report?"已经开始":"未开始")); }else{ prtWrt.println(" RTU上报数据:未开始") ; } if(sta.count != null){ prtWrt.println(" 已经上报数据:" + sta.count + "条"); @@ -120,11 +128,18 @@ prtWrt.println(""); } private static void start(PrintWriter prtWrt){ private static void startNetConnect(PrintWriter prtWrt){ prtWrt.println(""); prtWrt.println(" 已经允许mwTest上报数据了"); prtWrt.println(" 已经允许mwTest建立TCP连接"); prtWrt.println(""); Manager.enablemwTestStart = true ; Manager.enablemwTestStartTcpConnect = true ; } private static void startRtuReport(PrintWriter prtWrt){ prtWrt.println(""); prtWrt.println(" 已经允许mwTest RTU上报数据了"); prtWrt.println(""); Manager.enablemwTestStartRtuReport = true ; } pipIrr-platform/pipIrr-mw/pipIrr-mwTest-server/src/main/java/com/dy/testServer/forRmi/Code.java
@@ -3,14 +3,16 @@ public class Code { public static String cd1 = "001" ;//注册 public static String cd2 = "002" ;//请求mwClient的配置 public static String cd3 = "003" ;//请求开始工作 public static String cd4 = "004" ;//向服务端上报信息 public static String cd5 = "005" ;//向服务端上报 完成任务的数量 public static String cd2 = "002" ;//请求sepTest的配置 public static String cd6 = "006" ;//向服务端上报 全部任务完成 public static String cd3 = "003" ;//请求开始建立网络连接 public static String cd4 = "004" ;//请求开始RTU上报数据 public static String cd5 = "005" ;//向服务端上报信息 public static String cd6 = "006" ;//向服务端上报 完成任务的数量 public static String cd7 = "007" ;//向服务端上报 全部任务完成 } pipIrr-platform/pipIrr-mw/pipIrr-mwTest-server/src/main/java/com/dy/testServer/forRmi/Manager.java
@@ -8,9 +8,11 @@ import com.dy.testServer.ServerProperties; public class Manager { public static boolean enablemwTestStart = false ; public static boolean enablemwTestStartTcpConnect = false ; public static boolean enablemwTestStartRtuReport = false ; public static int clientId = 1 ; public static long maxClient = 0 ; @@ -79,10 +81,12 @@ }else if(rqVo.code.equals(Code.cd3)){ resVo.obj = doDealGetStart(rqVo) ; }else if(rqVo.code.equals(Code.cd4)){ doDealReportCount(rqVo) ; resVo.obj = doDealGetReport(rqVo) ; }else if(rqVo.code.equals(Code.cd5)){ doDealReportOver(rqVo) ; doDealReportCount(rqVo) ; }else if(rqVo.code.equals(Code.cd6)){ doDealReportOver(rqVo) ; }else if(rqVo.code.equals(Code.cd7)){ doDealAllOver(rqVo) ; } return resVo ; @@ -127,9 +131,9 @@ } return conVo ; } private static ResStartVo doDealGetStart(RmiRequestVo rqVo){ ResStartVo rvo = new ResStartVo() ; rvo.start = enablemwTestStart ; private static ResStartTcpConnectVo doDealGetStart(RmiRequestVo rqVo){ ResStartTcpConnectVo rvo = new ResStartTcpConnectVo() ; rvo.start = enablemwTestStartTcpConnect ; int token = Integer.parseInt(rqVo.token) ; MwTestClientStatus sta = token2ClientMap.get("" + token); if(sta == null){ @@ -141,7 +145,21 @@ } return rvo ; } private static ResStartRtuReportVo doDealGetReport(RmiRequestVo rqVo){ ResStartRtuReportVo rvo = new ResStartRtuReportVo() ; rvo.report = enablemwTestStartRtuReport ; int token = Integer.parseInt(rqVo.token) ; MwTestClientStatus sta = token2ClientMap.get("" + token); if(sta == null){ sta = new MwTestClientStatus() ; sta.reportVo = rvo ; token2ClientMap.put("" + token, sta); }else{ sta.reportVo = rvo ; } return rvo ; } private static void doDealReportCount(RmiRequestVo rqVo){ int token = Integer.parseInt(rqVo.token) ; pipIrr-platform/pipIrr-mw/pipIrr-mwTest-server/src/main/java/com/dy/testServer/forRmi/MwTestClientStatus.java
@@ -4,7 +4,9 @@ public MwConfigVo confVo ; public ResStartVo startVo ; public ResStartTcpConnectVo startVo ; public ResStartRtuReportVo reportVo ; public Integer count ;//mwTest上报数据数量 @@ -22,11 +24,11 @@ this.confVo = confVo; } public ResStartVo getStartVo() { public ResStartTcpConnectVo getStartVo() { return startVo; } public void setStartVo(ResStartVo startVo) { public void setStartVo(ResStartTcpConnectVo startVo) { this.startVo = startVo; } @@ -53,6 +55,20 @@ public void setSeconds(Long seconds) { this.seconds = seconds; } public ResStartRtuReportVo getReportVo() { return reportVo; } public void setReportVo(ResStartRtuReportVo reportVo) { this.reportVo = reportVo; } public Integer getOverCount() { return overCount; } public void setOverCount(Integer overCount) { this.overCount = overCount; } } pipIrr-platform/pipIrr-mw/pipIrr-mwTest-server/src/main/java/com/dy/testServer/forRmi/ResStartRtuReportVo.javacopy from pipIrr-platform/pipIrr-mw/pipIrr-mwTest-server/src/main/java/com/dy/testServer/forRmi/ResStartVo.java copy to pipIrr-platform/pipIrr-mw/pipIrr-mwTest-server/src/main/java/com/dy/testServer/forRmi/ResStartRtuReportVo.java
File was copied from pipIrr-platform/pipIrr-mw/pipIrr-mwTest-server/src/main/java/com/dy/testServer/forRmi/ResStartVo.java @@ -3,13 +3,13 @@ import com.alibaba.fastjson2.JSON; public class ResStartVo { public class ResStartRtuReportVo { public boolean success = true ; public String errorInfo ; public boolean start ; public boolean report; /** @@ -31,9 +31,9 @@ * @return 对象 * @throws Exception 异常 */ public static ResStartVo jsonToObject(String json)throws Exception{ public static ResStartRtuReportVo jsonToObject(String json)throws Exception{ try{ return JSON.parseObject(json, ResStartVo.class) ; return JSON.parseObject(json, ResStartRtuReportVo.class) ; //return new JSONDeserializer<ResStartVo>().deserialize(json, ResStartVo.class) ; }catch(Exception e){ throw new Exception(e.getMessage() , e ) ; @@ -52,12 +52,12 @@ public void setErrorInfo(String errorInfo) { this.errorInfo = errorInfo; } public boolean isStart() { return start; public boolean isReport() { return report; } public void setStart(boolean start) { this.start = start; public void setReport(boolean report) { this.report = report; } } pipIrr-platform/pipIrr-mw/pipIrr-mwTest-server/src/main/java/com/dy/testServer/forRmi/ResStartTcpConnectVo.java
File was renamed from pipIrr-platform/pipIrr-mw/pipIrr-mwTest-server/src/main/java/com/dy/testServer/forRmi/ResStartVo.java @@ -3,7 +3,7 @@ import com.alibaba.fastjson2.JSON; public class ResStartVo { public class ResStartTcpConnectVo { public boolean success = true ; @@ -31,9 +31,9 @@ * @return 对象 * @throws Exception 异常 */ public static ResStartVo jsonToObject(String json)throws Exception{ public static ResStartTcpConnectVo jsonToObject(String json)throws Exception{ try{ return JSON.parseObject(json, ResStartVo.class) ; return JSON.parseObject(json, ResStartTcpConnectVo.class) ; //return new JSONDeserializer<ResStartVo>().deserialize(json, ResStartVo.class) ; }catch(Exception e){ throw new Exception(e.getMessage() , e ) ;