From 70aeac289c132085e91d59dc82bc1facd41889fa Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期一, 24 二月 2025 15:33:53 +0800
Subject: [PATCH] 有关表阀一体机(王江海制定)通信协议: 1、根据新版本协议,解析部分更新; 2、根据测试一体阀协议数据处理需求与逻辑,进行表阀一体机数据处理。
---
 pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java |  176 +++++++++++++++++++++++++++++++++++++++++++++++-----------
 1 files changed, 141 insertions(+), 35 deletions(-)
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java b/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java
index 167adde..4537b10 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java
@@ -2,13 +2,15 @@
 
 import com.dy.common.mw.UnitAdapterInterface;
 import com.dy.common.mw.UnitInterface;
-import com.dy.common.mw.UnitStartedCallbackInterface;
+import com.dy.common.mw.UnitCallbackInterface;
 import com.dy.common.threadPool.ThreadPool;
 import com.dy.common.threadPool.TreadPoolFactory;
+import com.dy.common.util.Callback;
 import com.dy.testClient.ServerProperties;
 import com.dy.testClient.rmiClient.RmiClUnit;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.mina.core.session.IoSession;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -26,7 +28,7 @@
     private static ThreadPool.Pool pool ;
     private static Map<String, MyThreadJob> jobMap = new HashMap<>() ;
 
-    private static Integer totalRunedClientCount = 0;
+    private static Integer totalRtuClientCount = 0;
     private static Integer totalSendDataCount = 0;
     private static Integer totalOverClientCount = 0;
 
@@ -51,7 +53,7 @@
     }
 
     @Override
-    public void start(UnitStartedCallbackInterface callback) throws Exception {
+    public void start(UnitCallbackInterface callback) throws Exception {
         pool = TreadPoolFactory.getThreadPoolLong() ;
         System.out.println("Tcp Client妯″潡鎴愬姛鍚姩");
         this.doStart();
@@ -59,7 +61,7 @@
     }
 
     @Override
-    public void stop(UnitStartedCallbackInterface callback) throws Exception {
+    public void stop(UnitCallbackInterface callback) throws Exception {
         callback.call(null);
     }
 
@@ -69,26 +71,60 @@
             public void run() {
                 try {
                     while(true){
-                        if(!ServerProperties.startWork){
+                        if(!ServerProperties.startTcpConnectWork){
                             Thread.sleep(100L);
                         }else{
-                            startTime = System.currentTimeMillis() ;
-                            for(Long addr = ServerProperties.rtuAddrStart; addr <= ServerProperties.rtuAddrEnd; addr++){
-                                totalRunedClientCount++ ;
-                                createImitate(addr) ;
-                            }
-                            startJob() ;
-                            while(true){
-                                if(totalOverClientCount.longValue() >= totalRunedClientCount.longValue()){
-                                    Long seconds = (System.currentTimeMillis() - startTime)/1000 ;
-                                    RmiClUnit.getInstance().allOver(seconds) ;
-                                    System.out.println("鍏辩敤鏃�" + seconds + "绉�");
-                                    break ;
-                                }else{
-                                    Thread.sleep(100L);
+                            try{
+                                startTime = System.currentTimeMillis() ;
+                                for(Long addr = ServerProperties.rtuAddrStart; addr <= ServerProperties.rtuAddrEnd; addr++){
+                                    totalRtuClientCount++ ;
+                                    createImitate(addr) ;
                                 }
+                                log.info("鍏辨ā鎷熶簡" + totalRtuClientCount + "鍙癛TU");
+
+                                Collection<MyThreadJob> collection = jobMap.values() ;
+                                int connectedCount = 0 ;
+                                for(MyThreadJob job : collection){
+                                    connectServer(job) ;
+                                    connectedCount++ ;
+                                    log.info("褰撳墠寤虹珛涓庨�氫俊涓棿浠惰繛鎺ョ殑RTU鏁伴噺涓猴細" + connectedCount);
+                                }
+                                log.info("鎵�鏈塕TU宸蹭笌閫氫俊涓棿浠跺缓绔嬭繛鎺�");
+
+                                while (true){
+                                    int noConnectedCount = checkConnected() ;
+                                    if(noConnectedCount > 0){
+                                        log.info("绛夊緟" + noConnectedCount + "鍙癛TU杩炴帴缃戠粶");
+                                        Thread.sleep(100L);
+                                    }else{
+                                        break ;
+                                    }
+                                }
+
+                                while (true){
+                                    if(!ServerProperties.startRtuReportWork){
+                                        Thread.sleep(100L);
+                                    }else{
+                                        startJob() ;
+                                        break ;
+                                    }
+                                }
+
+                                while(true){
+                                    if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
+                                        Long seconds = (System.currentTimeMillis() - startTime)/1000 ;
+                                        RmiClUnit.getInstance().allOver(seconds) ;
+                                        log.info("鍏辩敤鏃�" + seconds + "绉�");
+                                        break ;
+                                    }else{
+                                        Thread.sleep(100L);
+                                    }
+                                }
+                            }catch (Exception e){
+                                e.printStackTrace();
+                            }finally {
+                                break ;
                             }
-                            break;
                         }
                     }
                 } catch (Exception e) {
@@ -106,22 +142,72 @@
         jobMap.put("" + rtuAddr, new MyThreadJob("" + rtuAddr, ServerProperties.tcpServerIp, ServerProperties.tcpServerPort)) ;
     }
 
+
+    private void connectServer(MyThreadJob job){
+        if(job.session == null){
+            try{
+                new TcpConnect().createSession(job.rtuAddr,
+                        job,
+                        job.serverIp,
+                        job.serverPort,
+                        job.connectTimeout,
+                        new TcpHandler(),
+                        new Callback() {
+                            @Override
+                            public void call(Object obj) {
+                                if(obj == null){
+                                    log.error("鍒涘缓缃戠粶浼氳瘽杩斿洖涓簄ull");
+                                }else{
+                                    job.session = (IoSession)obj ;
+                                }
+                            }
+                            @Override
+                            public void call(Object... objs) {
+                            }
+                            @Override
+                            public void exception(Exception e) {
+                            }
+                        }) ;
+            }catch (Exception e){
+                job.exceptionOnConnect = true ;
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private int checkConnected(){
+        int noConnectedCount = 0 ;
+        Collection<MyThreadJob> collection = jobMap.values() ;
+        for(MyThreadJob job : collection){
+            if(job.session == null && !job.exceptionOnConnect){
+                noConnectedCount++ ;
+            }
+        }
+        return noConnectedCount;
+    }
+
     private void startJob(){
-        new Thread(new Runnable(){
-            @Override
-            public void run() {
-                try {
-                    Thread.sleep(1000L);
-                    while(true){
-                        Collection<MyThreadJob> collection = jobMap.values() ;
-                        for(MyThreadJob job : collection){
+        new Thread(() -> {
+            try {
+                int notOverCount;
+                while(true){
+                    notOverCount = 0 ;
+                    Collection<MyThreadJob> collection = jobMap.values() ;
+                    for(MyThreadJob job : collection){
+                        if(!job.isOver){
+                            notOverCount++ ;
                             pool.putJob(job);
                         }
-                        Thread.sleep(ServerProperties.sendInterval * 1000);
                     }
-                } catch (Exception e) {
-                    e.printStackTrace();
+                    if(notOverCount > 0){
+                        log.info("褰撳墠杩樻湁" + notOverCount + "鍙癛TU鏈畬鎴愪换鍔�");
+                        Thread.sleep(ServerProperties.sendInterval * 1000);
+                    }else{
+                        break ;
+                    }
                 }
+            } catch (Exception e) {
+                e.printStackTrace();
             }
         }).start();
     }
@@ -129,17 +215,37 @@
 
     public static synchronized void clientSendData(){
         totalSendDataCount++;
-        if(totalSendDataCount % 100 == 0){
+        if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
             RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
-            System.out.println("宸茬粡鍙戦��" + totalSendDataCount + "鏉℃暟鎹�");
+            System.out.println("宸茬粡鍙戦��" + totalSendDataCount + "鏉℃暟鎹紙蹇冭烦鍜屼笂鎶ワ級");
+        }else{
+            if(totalRtuClientCount > 100){
+                if(totalSendDataCount % 100 == 0){
+                    RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
+                    System.out.println("宸茬粡鍙戦��" + totalSendDataCount + "鏉℃暟鎹紙蹇冭烦鍜屼笂鎶ワ級");
+                }
+            }else{
+                RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
+                System.out.println("宸茬粡鍙戦��" + totalSendDataCount + "鏉℃暟鎹紙蹇冭烦鍜屼笂鎶ワ級");
+            }
         }
     }
 
     public static synchronized void clientOver(){
         totalOverClientCount++;
-        if(totalOverClientCount % 100 == 0){
+        if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
             RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
-            System.out.println("宸叉湁" + totalOverClientCount + "瀹屾垚浜嗕换鍔�");
+            System.out.println("宸叉湁" + totalOverClientCount + "涓猂TU瀹屾垚浜嗕换鍔�");
+        }else{
+            if(totalRtuClientCount > 100) {
+                if (totalOverClientCount % 100 == 0) {
+                    RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
+                    System.out.println("宸叉湁" + totalOverClientCount + "涓猂TU瀹屾垚浜嗕换鍔�");
+                }
+            }else{
+                RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
+                System.out.println("宸叉湁" + totalOverClientCount + "涓猂TU瀹屾垚浜嗕换鍔�");
+            }
         }
     }
 }
--
Gitblit v1.8.0