liurunyu
2024-07-26 2f5f826af002b4555bb71f554785ef6faf3b5a0f
pipIrr-platform/pipIrr-mw/pipIrr-mwTest-client/src/main/java/com/dy/testClient/tcpClient/TcpClUnit.java
@@ -5,10 +5,16 @@
import com.dy.common.mw.UnitStartedCallbackInterface;
import com.dy.common.threadPool.ThreadPool;
import com.dy.common.threadPool.TreadPoolFactory;
import com.dy.common.util.Callback;
import com.dy.testClient.ServerProperties;
import com.dy.testClient.rmiClient.RmiClUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.mina.core.session.IoSession;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class TcpClUnit  implements UnitInterface {
@@ -20,8 +26,10 @@
    public static TcpClUnitConfigVo confVo ;
    private static ThreadPool.Pool pool ;
    private static Map<String, MyThreadJob> jobMap = new HashMap<>() ;
    private static Integer totalRunedClientCount = 0;
    private static Integer totalRtuClientCount = 0;
    private static Integer totalSendDataCount = 0;
    private static Integer totalOverClientCount = 0;
    private static Long startTime = 0L ;
@@ -63,25 +71,60 @@
            public void run() {
                try {
                    while(true){
                        if(!ServerProperties.startWork){
                        if(!ServerProperties.startTcpConnectWork){
                            Thread.sleep(100L);
                        }else{
                            startTime = System.currentTimeMillis() ;
                            for(Long addr = ServerProperties.rtuAddrStart; addr <= ServerProperties.rtuAddrEnd; addr++){
                                totalRunedClientCount++ ;
                                startClient(addr) ;
                            }
                            while(true){
                                if(totalOverClientCount.longValue() >= totalRunedClientCount.longValue()){
                                    Long seconds = (System.currentTimeMillis() - startTime)/1000 ;
                                    RmiClUnit.getInstance().reportHadReportOver(seconds) ;
                                    System.out.println("共用时" + seconds + "秒");
                                    break ;
                                }else{
                                    Thread.sleep(100L);
                            try{
                                startTime = System.currentTimeMillis() ;
                                for(Long addr = ServerProperties.rtuAddrStart; addr <= ServerProperties.rtuAddrEnd; addr++){
                                    totalRtuClientCount++ ;
                                    createImitate(addr) ;
                                }
                                log.info("共模拟了" + totalRtuClientCount + "台RTU");
                                Collection<MyThreadJob> collection = jobMap.values() ;
                                int connectedCount = 0 ;
                                for(MyThreadJob job : collection){
                                    connectServer(job) ;
                                    connectedCount++ ;
                                    log.info("当前建立与通信中间件连接的RTU数量为:" + connectedCount);
                                }
                                log.info("所有RTU已与通信中间件建立连接");
                                while (true){
                                    int noConnectedCount = checkConnected() ;
                                    if(noConnectedCount > 0){
                                        log.info("等待" + noConnectedCount + "台RTU连接网络");
                                        Thread.sleep(100L);
                                    }else{
                                        break ;
                                    }
                                }
                                while (true){
                                    if(!ServerProperties.startRtuReportWork){
                                        Thread.sleep(100L);
                                    }else{
                                        startJob() ;
                                        break ;
                                    }
                                }
                                while(true){
                                    if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
                                        Long seconds = (System.currentTimeMillis() - startTime)/1000 ;
                                        RmiClUnit.getInstance().allOver(seconds) ;
                                        log.info("共用时" + seconds + "秒");
                                        break ;
                                    }else{
                                        Thread.sleep(100L);
                                    }
                                }
                            }catch (Exception e){
                                e.printStackTrace();
                            }finally {
                                break ;
                            }
                            break;
                        }
                    }
                } catch (Exception e) {
@@ -91,20 +134,118 @@
        }).start();
    }
    private void startClient(Long rtuAddr){
        try {
            pool.putJob(new MyThreadJob("" + rtuAddr));
        } catch (Exception e) {
            log.error("TcpClUnit.startClient() ", e);
        }
    /**
     * 创建RTU模拟MyThreadJob
     * @param rtuAddr rtu地址
     */
    private void createImitate(Long rtuAddr){
        jobMap.put("" + rtuAddr, new MyThreadJob("" + rtuAddr, ServerProperties.tcpServerIp, ServerProperties.tcpServerPort)) ;
    }
    private void connectServer(MyThreadJob job){
        if(job.session == null){
            try{
                new TcpConnect().createSession(job.rtuAddr,
                        job,
                        job.serverIp,
                        job.serverPort,
                        job.connectTimeout,
                        new TcpHandler(),
                        new Callback() {
                            @Override
                            public void call(Object obj) {
                                if(obj == null){
                                    log.error("创建网络会话返回为null");
                                }else{
                                    job.session = (IoSession)obj ;
                                }
                            }
                            @Override
                            public void call(Object... objs) {
                            }
                            @Override
                            public void exception(Exception e) {
                            }
                        }) ;
            }catch (Exception e){
                job.exceptionOnConnect = true ;
                e.printStackTrace();
            }
        }
    }
    private int checkConnected(){
        int noConnectedCount = 0 ;
        Collection<MyThreadJob> collection = jobMap.values() ;
        for(MyThreadJob job : collection){
            if(job.session == null && !job.exceptionOnConnect){
                noConnectedCount++ ;
            }
        }
        return noConnectedCount;
    }
    private void startJob(){
        new Thread(() -> {
            try {
                int notOverCount;
                while(true){
                    notOverCount = 0 ;
                    Collection<MyThreadJob> collection = jobMap.values() ;
                    for(MyThreadJob job : collection){
                        if(!job.isOver){
                            notOverCount++ ;
                            pool.putJob(job);
                        }
                    }
                    if(notOverCount > 0){
                        log.info("当前还有" + notOverCount + "台RTU未完成任务");
                        Thread.sleep(ServerProperties.sendInterval * 1000);
                    }else{
                        break ;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
    public static synchronized void clientSendData(){
        totalSendDataCount++;
        if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
            RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
            System.out.println("已经发送" + totalSendDataCount + "条数据(心跳和上报)");
        }else{
            if(totalRtuClientCount > 100){
                if(totalSendDataCount % 100 == 0){
                    RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
                    System.out.println("已经发送" + totalSendDataCount + "条数据(心跳和上报)");
                }
            }else{
                RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
                System.out.println("已经发送" + totalSendDataCount + "条数据(心跳和上报)");
            }
        }
    }
    public static synchronized void clientOver(){
        totalOverClientCount++;
        if(totalOverClientCount % 100 == 0){
            RmiClUnit.getInstance().reportHadReportCount(totalOverClientCount);
            System.out.println("已经发送" + totalOverClientCount * + "条数据");
        if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
            RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
            System.out.println("已有" + totalOverClientCount + "个RTU完成了任务");
        }else{
            if(totalRtuClientCount > 100) {
                if (totalOverClientCount % 100 == 0) {
                    RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
                    System.out.println("已有" + totalOverClientCount + "个RTU完成了任务");
                }
            }else{
                RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
                System.out.println("已有" + totalOverClientCount + "个RTU完成了任务");
            }
        }
    }
}