From 120448e8c6826b453d5b96e9076d61479a987677 Mon Sep 17 00:00:00 2001
From: wuzeyu <1223318623@qq.com>
Date: 星期六, 02 十二月 2023 10:42:18 +0800
Subject: [PATCH] Merge branch 'master' of http://8.140.179.55:20000/r/pipIrr-SV

---
 pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java |  181 +++++++++++++++++++++++++++++++++++++++------
 1 files changed, 157 insertions(+), 24 deletions(-)

diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java b/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java
index b25512c..78a8b90 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java
@@ -5,10 +5,16 @@
 import com.dy.common.mw.UnitStartedCallbackInterface;
 import com.dy.common.threadPool.ThreadPool;
 import com.dy.common.threadPool.TreadPoolFactory;
+import com.dy.common.util.Callback;
 import com.dy.testClient.ServerProperties;
 import com.dy.testClient.rmiClient.RmiClUnit;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.mina.core.session.IoSession;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
 public class TcpClUnit  implements UnitInterface {
 
@@ -20,8 +26,10 @@
     public static TcpClUnitConfigVo confVo ;
 
     private static ThreadPool.Pool pool ;
+    private static Map<String, MyThreadJob> jobMap = new HashMap<>() ;
 
-    private static Integer totalRunedClientCount = 0;
+    private static Integer totalRtuClientCount = 0;
+    private static Integer totalSendDataCount = 0;
     private static Integer totalOverClientCount = 0;
 
     private static Long startTime = 0L ;
@@ -66,22 +74,46 @@
                         if(!ServerProperties.startWork){
                             Thread.sleep(100L);
                         }else{
-                            startTime = System.currentTimeMillis() ;
-                            for(Long addr = ServerProperties.rtuAddrStart; addr <= ServerProperties.rtuAddrEnd; addr++){
-                                totalRunedClientCount++ ;
-                                startClient(addr) ;
-                            }
-                            while(true){
-                                if(totalOverClientCount.longValue() >= totalRunedClientCount.longValue()){
-                                    Long seconds = (System.currentTimeMillis() - startTime)/1000 ;
-                                    RmiClUnit.getInstance().reportHadReportOver(seconds) ;
-                                    System.out.println("鍏辩敤鏃�" + seconds + "绉�");
-                                    break ;
-                                }else{
-                                    Thread.sleep(100L);
+                            try{
+                                startTime = System.currentTimeMillis() ;
+                                for(Long addr = ServerProperties.rtuAddrStart; addr <= ServerProperties.rtuAddrEnd; addr++){
+                                    totalRtuClientCount++ ;
+                                    createImitate(addr) ;
                                 }
+                                log.info("鍏辨ā鎷熶簡" + totalRtuClientCount + "鍙癛TU");
+
+                                Collection<MyThreadJob> collection = jobMap.values() ;
+                                for(MyThreadJob job : collection){
+                                    connectServer(job) ;
+                                }
+                                log.info("鍚姩鎵�鏈塕TU杩炴帴閫氫俊涓棿浠�");
+
+                                while (true){
+                                    int noConnectedCount = checkConnected() ;
+                                    if(noConnectedCount > 0){
+                                        log.info("绛夊緟" + noConnectedCount + "鍙癛TU杩炴帴缃戠粶");
+                                        Thread.sleep(100L);
+                                    }else{
+                                        break ;
+                                    }
+                                }
+
+                                startJob() ;
+                                while(true){
+                                    if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
+                                        Long seconds = (System.currentTimeMillis() - startTime)/1000 ;
+                                        RmiClUnit.getInstance().allOver(seconds) ;
+                                        log.info("鍏辩敤鏃�" + seconds + "绉�");
+                                        break ;
+                                    }else{
+                                        Thread.sleep(100L);
+                                    }
+                                }
+                            }catch (Exception e){
+                                e.printStackTrace();
+                            }finally {
+                                break ;
                             }
-                            break;
                         }
                     }
                 } catch (Exception e) {
@@ -91,20 +123,121 @@
         }).start();
     }
 
-    private void startClient(Long rtuAddr){
-        try {
-            pool.putJob(new MyThreadJob("" + rtuAddr));
-        } catch (Exception e) {
-            log.error("TcpClUnit.startClient() ", e);
-        }
+    /**
+     * 鍒涘缓RTU妯℃嫙MyThreadJob
+     * @param rtuAddr rtu鍦板潃
+     */
+    private void createImitate(Long rtuAddr){
+        jobMap.put("" + rtuAddr, new MyThreadJob("" + rtuAddr, ServerProperties.tcpServerIp, ServerProperties.tcpServerPort)) ;
     }
 
 
+    private void connectServer(MyThreadJob job){
+        if(job.session == null){
+            try{
+                new TcpConnect().createSession(job.rtuAddr,
+                        job,
+                        job.serverIp,
+                        job.serverPort,
+                        job.connectTimeout,
+                        new TcpHandler(),
+                        new Callback() {
+                            @Override
+                            public void call(Object obj) {
+                                if(obj == null){
+                                    log.error("鍒涘缓缃戠粶浼氳瘽杩斿洖涓簄ull");
+                                }else{
+                                    job.session = (IoSession)obj ;
+                                }
+                            }
+                            @Override
+                            public void call(Object... objs) {
+                            }
+                            @Override
+                            public void exception(Exception e) {
+                            }
+                        }) ;
+            }catch (Exception e){
+                job.exceptionOnConnect = true ;
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private int checkConnected(){
+        int noConnectedCount = 0 ;
+        Collection<MyThreadJob> collection = jobMap.values() ;
+        for(MyThreadJob job : collection){
+            if(job.session == null && !job.exceptionOnConnect){
+                noConnectedCount++ ;
+            }
+        }
+        return noConnectedCount;
+    }
+
+    private void startJob(){
+        new Thread(new Runnable(){
+            @Override
+            public void run() {
+                try {
+                    int notOverCount;
+                    while(true){
+                        notOverCount = 0 ;
+                        Collection<MyThreadJob> collection = jobMap.values() ;
+                        for(MyThreadJob job : collection){
+                            if(!job.isOver){
+                                notOverCount++ ;
+                                pool.putJob(job);
+                            }
+                        }
+                        if(notOverCount > 0){
+                            log.info("褰撳墠杩樻湁" + notOverCount + "鍙癛TU鏈畬鎴愪换鍔�");
+                            Thread.sleep(ServerProperties.sendInterval * 1000);
+                        }else{
+                            break ;
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+    }
+
+
+    public static synchronized void clientSendData(){
+        totalSendDataCount++;
+        if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
+            RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
+            System.out.println("宸茬粡鍙戦��" + totalSendDataCount + "鏉℃暟鎹�");
+        }else{
+            if(totalRtuClientCount > 100){
+                if(totalSendDataCount % 100 == 0){
+                    RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
+                    System.out.println("宸茬粡鍙戦��" + totalSendDataCount + "鏉℃暟鎹�");
+                }
+            }else{
+                RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
+                System.out.println("宸茬粡鍙戦��" + totalSendDataCount + "鏉℃暟鎹�");
+            }
+        }
+    }
+
     public static synchronized void clientOver(){
         totalOverClientCount++;
-        if(totalOverClientCount % 100 == 0){
-            RmiClUnit.getInstance().reportHadReportCount(totalOverClientCount);
-            System.out.println("宸茬粡鍙戦��" + totalOverClientCount * + "鏉℃暟鎹�");
+        if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
+            RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
+            System.out.println("宸叉湁" + totalOverClientCount + "涓猂TU瀹屾垚浜嗕换鍔�");
+        }else{
+            if(totalRtuClientCount > 100) {
+                if (totalOverClientCount % 100 == 0) {
+                    RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
+                    System.out.println("宸叉湁" + totalOverClientCount + "涓猂TU瀹屾垚浜嗕换鍔�");
+                }
+            }else{
+                RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
+                System.out.println("宸叉湁" + totalOverClientCount + "涓猂TU瀹屾垚浜嗕换鍔�");
+            }
         }
     }
 }

--
Gitblit v1.8.0