From 9ad6b0179be4d351105f1507779d4f41a3a41953 Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期六, 07 九月 2024 16:57:40 +0800
Subject: [PATCH] 1、完善代码; 2、江海协议时标(tp)实现中的bug修改。

---
 pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java |  197 ++++++++++++++++++++++++++++++++++++++++++-------
 1 files changed, 169 insertions(+), 28 deletions(-)

diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java b/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java
index 1189f3b..4537b10 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java
@@ -2,13 +2,19 @@
 
 import com.dy.common.mw.UnitAdapterInterface;
 import com.dy.common.mw.UnitInterface;
-import com.dy.common.mw.UnitStartedCallbackInterface;
+import com.dy.common.mw.UnitCallbackInterface;
 import com.dy.common.threadPool.ThreadPool;
 import com.dy.common.threadPool.TreadPoolFactory;
+import com.dy.common.util.Callback;
 import com.dy.testClient.ServerProperties;
 import com.dy.testClient.rmiClient.RmiClUnit;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.mina.core.session.IoSession;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
 public class TcpClUnit  implements UnitInterface {
 
@@ -20,8 +26,10 @@
     public static TcpClUnitConfigVo confVo ;
 
     private static ThreadPool.Pool pool ;
+    private static Map<String, MyThreadJob> jobMap = new HashMap<>() ;
 
-    private static Integer totalRunedClientCount = 0;
+    private static Integer totalRtuClientCount = 0;
+    private static Integer totalSendDataCount = 0;
     private static Integer totalOverClientCount = 0;
 
     private static Long startTime = 0L ;
@@ -45,7 +53,7 @@
     }
 
     @Override
-    public void start(UnitStartedCallbackInterface callback) throws Exception {
+    public void start(UnitCallbackInterface callback) throws Exception {
         pool = TreadPoolFactory.getThreadPoolLong() ;
         System.out.println("Tcp Client妯″潡鎴愬姛鍚姩");
         this.doStart();
@@ -53,7 +61,7 @@
     }
 
     @Override
-    public void stop(UnitStartedCallbackInterface callback) throws Exception {
+    public void stop(UnitCallbackInterface callback) throws Exception {
         callback.call(null);
     }
 
@@ -63,25 +71,60 @@
             public void run() {
                 try {
                     while(true){
-                        if(!ServerProperties.startWork){
+                        if(!ServerProperties.startTcpConnectWork){
                             Thread.sleep(100L);
                         }else{
-                            startTime = System.currentTimeMillis() ;
-                            for(Long addr = ServerProperties.rtuAddrStart; addr <= ServerProperties.rtuAddrEnd; addr++){
-                                totalRunedClientCount++ ;
-                                startClient(addr) ;
-                            }
-                            while(true){
-                                if(totalOverClientCount.longValue() >= totalRunedClientCount.longValue()){
-                                    Long seconds = (System.currentTimeMillis() - startTime)/1000 ;
-                                    RmiClUnit.getInstance().reportHadReportOver(seconds) ;
-                                    System.out.println("鍏辩敤鏃�" + seconds + "绉�");
-                                    break ;
-                                }else{
-                                    Thread.sleep(100L);
+                            try{
+                                startTime = System.currentTimeMillis() ;
+                                for(Long addr = ServerProperties.rtuAddrStart; addr <= ServerProperties.rtuAddrEnd; addr++){
+                                    totalRtuClientCount++ ;
+                                    createImitate(addr) ;
                                 }
+                                log.info("鍏辨ā鎷熶簡" + totalRtuClientCount + "鍙癛TU");
+
+                                Collection<MyThreadJob> collection = jobMap.values() ;
+                                int connectedCount = 0 ;
+                                for(MyThreadJob job : collection){
+                                    connectServer(job) ;
+                                    connectedCount++ ;
+                                    log.info("褰撳墠寤虹珛涓庨�氫俊涓棿浠惰繛鎺ョ殑RTU鏁伴噺涓猴細" + connectedCount);
+                                }
+                                log.info("鎵�鏈塕TU宸蹭笌閫氫俊涓棿浠跺缓绔嬭繛鎺�");
+
+                                while (true){
+                                    int noConnectedCount = checkConnected() ;
+                                    if(noConnectedCount > 0){
+                                        log.info("绛夊緟" + noConnectedCount + "鍙癛TU杩炴帴缃戠粶");
+                                        Thread.sleep(100L);
+                                    }else{
+                                        break ;
+                                    }
+                                }
+
+                                while (true){
+                                    if(!ServerProperties.startRtuReportWork){
+                                        Thread.sleep(100L);
+                                    }else{
+                                        startJob() ;
+                                        break ;
+                                    }
+                                }
+
+                                while(true){
+                                    if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
+                                        Long seconds = (System.currentTimeMillis() - startTime)/1000 ;
+                                        RmiClUnit.getInstance().allOver(seconds) ;
+                                        log.info("鍏辩敤鏃�" + seconds + "绉�");
+                                        break ;
+                                    }else{
+                                        Thread.sleep(100L);
+                                    }
+                                }
+                            }catch (Exception e){
+                                e.printStackTrace();
+                            }finally {
+                                break ;
                             }
-                            break;
                         }
                     }
                 } catch (Exception e) {
@@ -91,20 +134,118 @@
         }).start();
     }
 
-    private void startClient(Long rtuAddr){
-        try {
-            pool.putJob(new MyThreadJob("" + rtuAddr));
-        } catch (Exception e) {
-            log.error("TcpClUnit.startClient() ", e);
-        }
+    /**
+     * 鍒涘缓RTU妯℃嫙MyThreadJob
+     * @param rtuAddr rtu鍦板潃
+     */
+    private void createImitate(Long rtuAddr){
+        jobMap.put("" + rtuAddr, new MyThreadJob("" + rtuAddr, ServerProperties.tcpServerIp, ServerProperties.tcpServerPort)) ;
     }
 
 
+    private void connectServer(MyThreadJob job){
+        if(job.session == null){
+            try{
+                new TcpConnect().createSession(job.rtuAddr,
+                        job,
+                        job.serverIp,
+                        job.serverPort,
+                        job.connectTimeout,
+                        new TcpHandler(),
+                        new Callback() {
+                            @Override
+                            public void call(Object obj) {
+                                if(obj == null){
+                                    log.error("鍒涘缓缃戠粶浼氳瘽杩斿洖涓簄ull");
+                                }else{
+                                    job.session = (IoSession)obj ;
+                                }
+                            }
+                            @Override
+                            public void call(Object... objs) {
+                            }
+                            @Override
+                            public void exception(Exception e) {
+                            }
+                        }) ;
+            }catch (Exception e){
+                job.exceptionOnConnect = true ;
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private int checkConnected(){
+        int noConnectedCount = 0 ;
+        Collection<MyThreadJob> collection = jobMap.values() ;
+        for(MyThreadJob job : collection){
+            if(job.session == null && !job.exceptionOnConnect){
+                noConnectedCount++ ;
+            }
+        }
+        return noConnectedCount;
+    }
+
+    private void startJob(){
+        new Thread(() -> {
+            try {
+                int notOverCount;
+                while(true){
+                    notOverCount = 0 ;
+                    Collection<MyThreadJob> collection = jobMap.values() ;
+                    for(MyThreadJob job : collection){
+                        if(!job.isOver){
+                            notOverCount++ ;
+                            pool.putJob(job);
+                        }
+                    }
+                    if(notOverCount > 0){
+                        log.info("褰撳墠杩樻湁" + notOverCount + "鍙癛TU鏈畬鎴愪换鍔�");
+                        Thread.sleep(ServerProperties.sendInterval * 1000);
+                    }else{
+                        break ;
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }).start();
+    }
+
+
+    public static synchronized void clientSendData(){
+        totalSendDataCount++;
+        if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
+            RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
+            System.out.println("宸茬粡鍙戦��" + totalSendDataCount + "鏉℃暟鎹紙蹇冭烦鍜屼笂鎶ワ級");
+        }else{
+            if(totalRtuClientCount > 100){
+                if(totalSendDataCount % 100 == 0){
+                    RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
+                    System.out.println("宸茬粡鍙戦��" + totalSendDataCount + "鏉℃暟鎹紙蹇冭烦鍜屼笂鎶ワ級");
+                }
+            }else{
+                RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
+                System.out.println("宸茬粡鍙戦��" + totalSendDataCount + "鏉℃暟鎹紙蹇冭烦鍜屼笂鎶ワ級");
+            }
+        }
+    }
+
     public static synchronized void clientOver(){
         totalOverClientCount++;
-        if(totalOverClientCount % 100 == 0){
-            RmiClUnit.getInstance().reportHadReportCount(totalOverClientCount);
-            System.out.println("宸茬粡鍙戦��" + totalOverClientCount + "鏉℃暟鎹�");
+        if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
+            RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
+            System.out.println("宸叉湁" + totalOverClientCount + "涓猂TU瀹屾垚浜嗕换鍔�");
+        }else{
+            if(totalRtuClientCount > 100) {
+                if (totalOverClientCount % 100 == 0) {
+                    RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
+                    System.out.println("宸叉湁" + totalOverClientCount + "涓猂TU瀹屾垚浜嗕换鍔�");
+                }
+            }else{
+                RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
+                System.out.println("宸叉湁" + totalOverClientCount + "涓猂TU瀹屾垚浜嗕换鍔�");
+            }
         }
     }
 }

--
Gitblit v1.8.0