From f767b17d6cc79fbfa1ce6bd554ef4cedd53b82a5 Mon Sep 17 00:00:00 2001
From: wuzeyu <1223318623@qq.com>
Date: 星期一, 25 三月 2024 16:09:36 +0800
Subject: [PATCH] 修改查询控制器 增加显示在线状态字段 具体数据待对接
---
pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java | 191 +++++++++++++++++++++++++++++++++++++++++------
1 files changed, 166 insertions(+), 25 deletions(-)
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java b/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java
index 1189f3b..e2f8660 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java
@@ -5,10 +5,16 @@
import com.dy.common.mw.UnitStartedCallbackInterface;
import com.dy.common.threadPool.ThreadPool;
import com.dy.common.threadPool.TreadPoolFactory;
+import com.dy.common.util.Callback;
import com.dy.testClient.ServerProperties;
import com.dy.testClient.rmiClient.RmiClUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.mina.core.session.IoSession;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
public class TcpClUnit implements UnitInterface {
@@ -20,8 +26,10 @@
public static TcpClUnitConfigVo confVo ;
private static ThreadPool.Pool pool ;
+ private static Map<String, MyThreadJob> jobMap = new HashMap<>() ;
- private static Integer totalRunedClientCount = 0;
+ private static Integer totalRtuClientCount = 0;
+ private static Integer totalSendDataCount = 0;
private static Integer totalOverClientCount = 0;
private static Long startTime = 0L ;
@@ -63,25 +71,60 @@
public void run() {
try {
while(true){
- if(!ServerProperties.startWork){
+ if(!ServerProperties.startTcpConnectWork){
Thread.sleep(100L);
}else{
- startTime = System.currentTimeMillis() ;
- for(Long addr = ServerProperties.rtuAddrStart; addr <= ServerProperties.rtuAddrEnd; addr++){
- totalRunedClientCount++ ;
- startClient(addr) ;
- }
- while(true){
- if(totalOverClientCount.longValue() >= totalRunedClientCount.longValue()){
- Long seconds = (System.currentTimeMillis() - startTime)/1000 ;
- RmiClUnit.getInstance().reportHadReportOver(seconds) ;
- System.out.println("鍏辩敤鏃�" + seconds + "绉�");
- break ;
- }else{
- Thread.sleep(100L);
+ try{
+ startTime = System.currentTimeMillis() ;
+ for(Long addr = ServerProperties.rtuAddrStart; addr <= ServerProperties.rtuAddrEnd; addr++){
+ totalRtuClientCount++ ;
+ createImitate(addr) ;
}
+ log.info("鍏辨ā鎷熶簡" + totalRtuClientCount + "鍙癛TU");
+
+ Collection<MyThreadJob> collection = jobMap.values() ;
+ int connectedCount = 0 ;
+ for(MyThreadJob job : collection){
+ connectServer(job) ;
+ connectedCount++ ;
+ log.info("褰撳墠寤虹珛涓庨�氫俊涓棿浠惰繛鎺ョ殑RTU鏁伴噺涓猴細" + connectedCount);
+ }
+ log.info("鎵�鏈塕TU宸蹭笌閫氫俊涓棿浠跺缓绔嬭繛鎺�");
+
+ while (true){
+ int noConnectedCount = checkConnected() ;
+ if(noConnectedCount > 0){
+ log.info("绛夊緟" + noConnectedCount + "鍙癛TU杩炴帴缃戠粶");
+ Thread.sleep(100L);
+ }else{
+ break ;
+ }
+ }
+
+ while (true){
+ if(!ServerProperties.startRtuReportWork){
+ Thread.sleep(100L);
+ }else{
+ startJob() ;
+ break ;
+ }
+ }
+
+ while(true){
+ if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
+ Long seconds = (System.currentTimeMillis() - startTime)/1000 ;
+ RmiClUnit.getInstance().allOver(seconds) ;
+ log.info("鍏辩敤鏃�" + seconds + "绉�");
+ break ;
+ }else{
+ Thread.sleep(100L);
+ }
+ }
+ }catch (Exception e){
+ e.printStackTrace();
+ }finally {
+ break ;
}
- break;
}
}
} catch (Exception e) {
@@ -91,20 +134,118 @@
}).start();
}
- private void startClient(Long rtuAddr){
- try {
- pool.putJob(new MyThreadJob("" + rtuAddr));
- } catch (Exception e) {
- log.error("TcpClUnit.startClient() ", e);
- }
+ /**
+ * 鍒涘缓RTU妯℃嫙MyThreadJob
+ * @param rtuAddr rtu鍦板潃
+ */
+ private void createImitate(Long rtuAddr){
+ jobMap.put("" + rtuAddr, new MyThreadJob("" + rtuAddr, ServerProperties.tcpServerIp, ServerProperties.tcpServerPort)) ;
}
+ private void connectServer(MyThreadJob job){
+ if(job.session == null){
+ try{
+ new TcpConnect().createSession(job.rtuAddr,
+ job,
+ job.serverIp,
+ job.serverPort,
+ job.connectTimeout,
+ new TcpHandler(),
+ new Callback() {
+ @Override
+ public void call(Object obj) {
+ if(obj == null){
+ log.error("鍒涘缓缃戠粶浼氳瘽杩斿洖涓簄ull");
+ }else{
+ job.session = (IoSession)obj ;
+ }
+ }
+ @Override
+ public void call(Object... objs) {
+ }
+ @Override
+ public void exception(Exception e) {
+ }
+ }) ;
+ }catch (Exception e){
+ job.exceptionOnConnect = true ;
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private int checkConnected(){
+ int noConnectedCount = 0 ;
+ Collection<MyThreadJob> collection = jobMap.values() ;
+ for(MyThreadJob job : collection){
+ if(job.session == null && !job.exceptionOnConnect){
+ noConnectedCount++ ;
+ }
+ }
+ return noConnectedCount;
+ }
+
+ private void startJob(){
+ new Thread(() -> {
+ try {
+ int notOverCount;
+ while(true){
+ notOverCount = 0 ;
+ Collection<MyThreadJob> collection = jobMap.values() ;
+ for(MyThreadJob job : collection){
+ if(!job.isOver){
+ notOverCount++ ;
+ pool.putJob(job);
+ }
+ }
+ if(notOverCount > 0){
+ log.info("褰撳墠杩樻湁" + notOverCount + "鍙癛TU鏈畬鎴愪换鍔�");
+ Thread.sleep(ServerProperties.sendInterval * 1000);
+ }else{
+ break ;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }).start();
+ }
+
+
+ public static synchronized void clientSendData(){
+ totalSendDataCount++;
+ if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
+ RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
+ System.out.println("宸茬粡鍙戦��" + totalSendDataCount + "鏉℃暟鎹紙蹇冭烦鍜屼笂鎶ワ級");
+ }else{
+ if(totalRtuClientCount > 100){
+ if(totalSendDataCount % 100 == 0){
+ RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
+ System.out.println("宸茬粡鍙戦��" + totalSendDataCount + "鏉℃暟鎹紙蹇冭烦鍜屼笂鎶ワ級");
+ }
+ }else{
+ RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
+ System.out.println("宸茬粡鍙戦��" + totalSendDataCount + "鏉℃暟鎹紙蹇冭烦鍜屼笂鎶ワ級");
+ }
+ }
+ }
+
public static synchronized void clientOver(){
totalOverClientCount++;
- if(totalOverClientCount % 100 == 0){
- RmiClUnit.getInstance().reportHadReportCount(totalOverClientCount);
- System.out.println("宸茬粡鍙戦��" + totalOverClientCount + "鏉℃暟鎹�");
+ if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
+ RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
+ System.out.println("宸叉湁" + totalOverClientCount + "涓猂TU瀹屾垚浜嗕换鍔�");
+ }else{
+ if(totalRtuClientCount > 100) {
+ if (totalOverClientCount % 100 == 0) {
+ RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
+ System.out.println("宸叉湁" + totalOverClientCount + "涓猂TU瀹屾垚浜嗕换鍔�");
+ }
+ }else{
+ RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
+ System.out.println("宸叉湁" + totalOverClientCount + "涓猂TU瀹屾垚浜嗕换鍔�");
+ }
}
}
}
--
Gitblit v1.8.0