package com.dy.testClient.tcpClient; 
 | 
  
 | 
import com.dy.common.mw.UnitAdapterInterface; 
 | 
import com.dy.common.mw.UnitInterface; 
 | 
import com.dy.common.mw.UnitCallbackInterface; 
 | 
import com.dy.common.threadPool.ThreadPool; 
 | 
import com.dy.common.threadPool.TreadPoolFactory; 
 | 
import com.dy.common.util.Callback; 
 | 
import com.dy.testClient.ServerProperties; 
 | 
import com.dy.testClient.rmiClient.RmiClUnit; 
 | 
import org.apache.logging.log4j.LogManager; 
 | 
import org.apache.logging.log4j.Logger; 
 | 
import org.apache.mina.core.session.IoSession; 
 | 
  
 | 
import java.util.Collection; 
 | 
import java.util.HashMap; 
 | 
import java.util.Map; 
 | 
  
 | 
public class TcpClUnit  implements UnitInterface { 
 | 
  
 | 
    private static final Logger log = LogManager.getLogger(TcpClUnit.class) ; 
 | 
  
 | 
    private static TcpClUnit instance = new TcpClUnit() ; 
 | 
  
 | 
    public static TcpClUnitAdapter adapter ; 
 | 
    public static TcpClUnitConfigVo confVo ; 
 | 
  
 | 
    private static ThreadPool.Pool pool ; 
 | 
    private static Map<String, MyThreadJob> jobMap = new HashMap<>() ; 
 | 
  
 | 
    private static Integer totalRtuClientCount = 0; 
 | 
    private static Integer totalSendDataCount = 0; 
 | 
    private static Integer totalOverClientCount = 0; 
 | 
  
 | 
    private static Long startTime = 0L ; 
 | 
  
 | 
    private TcpClUnit(){} ; 
 | 
  
 | 
    public static TcpClUnit getInstance(){ 
 | 
        return instance ; 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void setAdapter(UnitAdapterInterface adapter) throws Exception { 
 | 
        if(adapter == null){ 
 | 
            throw new Exception("Tcp Client模块适配器对象不能为空!") ; 
 | 
        } 
 | 
        TcpClUnit.adapter = (TcpClUnitAdapter)adapter ; 
 | 
        TcpClUnit.confVo = TcpClUnit.adapter.getConfig() ; 
 | 
        if(TcpClUnit.confVo == null){ 
 | 
            throw new Exception("Tcp Client模块配置对象不能为空!") ; 
 | 
        } 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void start(UnitCallbackInterface callback) throws Exception { 
 | 
        pool = TreadPoolFactory.getThreadPoolLong() ; 
 | 
        System.out.println("Tcp Client模块成功启动"); 
 | 
        this.doStart(); 
 | 
        callback.call(null) ; 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void stop(UnitCallbackInterface callback) throws Exception { 
 | 
        callback.call(null); 
 | 
    } 
 | 
  
 | 
    private void doStart(){ 
 | 
        new Thread(new Runnable(){ 
 | 
            @Override 
 | 
            public void run() { 
 | 
                try { 
 | 
                    while(true){ 
 | 
                        if(!ServerProperties.startTcpConnectWork){ 
 | 
                            Thread.sleep(100L); 
 | 
                        }else{ 
 | 
                            try{ 
 | 
                                startTime = System.currentTimeMillis() ; 
 | 
                                for(Long addr = ServerProperties.rtuAddrStart; addr <= ServerProperties.rtuAddrEnd; addr++){ 
 | 
                                    totalRtuClientCount++ ; 
 | 
                                    createImitate(addr) ; 
 | 
                                } 
 | 
                                log.info("共模拟了" + totalRtuClientCount + "台RTU"); 
 | 
  
 | 
                                Collection<MyThreadJob> collection = jobMap.values() ; 
 | 
                                int connectedCount = 0 ; 
 | 
                                for(MyThreadJob job : collection){ 
 | 
                                    connectServer(job) ; 
 | 
                                    connectedCount++ ; 
 | 
                                    log.info("当前建立与通信中间件连接的RTU数量为:" + connectedCount); 
 | 
                                } 
 | 
                                log.info("所有RTU已与通信中间件建立连接"); 
 | 
  
 | 
                                while (true){ 
 | 
                                    int noConnectedCount = checkConnected() ; 
 | 
                                    if(noConnectedCount > 0){ 
 | 
                                        log.info("等待" + noConnectedCount + "台RTU连接网络"); 
 | 
                                        Thread.sleep(100L); 
 | 
                                    }else{ 
 | 
                                        break ; 
 | 
                                    } 
 | 
                                } 
 | 
  
 | 
                                while (true){ 
 | 
                                    if(!ServerProperties.startRtuReportWork){ 
 | 
                                        Thread.sleep(100L); 
 | 
                                    }else{ 
 | 
                                        startJob() ; 
 | 
                                        break ; 
 | 
                                    } 
 | 
                                } 
 | 
  
 | 
                                while(true){ 
 | 
                                    if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){ 
 | 
                                        Long seconds = (System.currentTimeMillis() - startTime)/1000 ; 
 | 
                                        RmiClUnit.getInstance().allOver(seconds) ; 
 | 
                                        log.info("共用时" + seconds + "秒"); 
 | 
                                        break ; 
 | 
                                    }else{ 
 | 
                                        Thread.sleep(100L); 
 | 
                                    } 
 | 
                                } 
 | 
                            }catch (Exception e){ 
 | 
                                e.printStackTrace(); 
 | 
                            }finally { 
 | 
                                break ; 
 | 
                            } 
 | 
                        } 
 | 
                    } 
 | 
                } catch (Exception e) { 
 | 
                    e.printStackTrace(); 
 | 
                } 
 | 
            } 
 | 
        }).start(); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 创建RTU模拟MyThreadJob 
 | 
     * @param rtuAddr rtu地址 
 | 
     */ 
 | 
    private void createImitate(Long rtuAddr){ 
 | 
        jobMap.put("" + rtuAddr, new MyThreadJob("" + rtuAddr, ServerProperties.tcpServerIp, ServerProperties.tcpServerPort)) ; 
 | 
    } 
 | 
  
 | 
  
 | 
    private void connectServer(MyThreadJob job){ 
 | 
        if(job.session == null){ 
 | 
            try{ 
 | 
                new TcpConnect().createSession(job.rtuAddr, 
 | 
                        job, 
 | 
                        job.serverIp, 
 | 
                        job.serverPort, 
 | 
                        job.connectTimeout, 
 | 
                        new TcpHandler(), 
 | 
                        new Callback() { 
 | 
                            @Override 
 | 
                            public void call(Object obj) { 
 | 
                                if(obj == null){ 
 | 
                                    log.error("创建网络会话返回为null"); 
 | 
                                }else{ 
 | 
                                    job.session = (IoSession)obj ; 
 | 
                                } 
 | 
                            } 
 | 
                            @Override 
 | 
                            public void call(Object... objs) { 
 | 
                            } 
 | 
                            @Override 
 | 
                            public void exception(Exception e) { 
 | 
                            } 
 | 
                        }) ; 
 | 
            }catch (Exception e){ 
 | 
                job.exceptionOnConnect = true ; 
 | 
                e.printStackTrace(); 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
  
 | 
    private int checkConnected(){ 
 | 
        int noConnectedCount = 0 ; 
 | 
        Collection<MyThreadJob> collection = jobMap.values() ; 
 | 
        for(MyThreadJob job : collection){ 
 | 
            if(job.session == null && !job.exceptionOnConnect){ 
 | 
                noConnectedCount++ ; 
 | 
            } 
 | 
        } 
 | 
        return noConnectedCount; 
 | 
    } 
 | 
  
 | 
    private void startJob(){ 
 | 
        new Thread(() -> { 
 | 
            try { 
 | 
                int notOverCount; 
 | 
                while(true){ 
 | 
                    notOverCount = 0 ; 
 | 
                    Collection<MyThreadJob> collection = jobMap.values() ; 
 | 
                    for(MyThreadJob job : collection){ 
 | 
                        if(!job.isOver){ 
 | 
                            notOverCount++ ; 
 | 
                            pool.putJob(job); 
 | 
                        } 
 | 
                    } 
 | 
                    if(notOverCount > 0){ 
 | 
                        log.info("当前还有" + notOverCount + "台RTU未完成任务"); 
 | 
                        Thread.sleep(ServerProperties.sendInterval * 1000); 
 | 
                    }else{ 
 | 
                        break ; 
 | 
                    } 
 | 
                } 
 | 
            } catch (Exception e) { 
 | 
                e.printStackTrace(); 
 | 
            } 
 | 
        }).start(); 
 | 
    } 
 | 
  
 | 
  
 | 
    public static synchronized void clientSendData(){ 
 | 
        totalSendDataCount++; 
 | 
        if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){ 
 | 
            RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount); 
 | 
            System.out.println("已经发送" + totalSendDataCount + "条数据(心跳和上报)"); 
 | 
        }else{ 
 | 
            if(totalRtuClientCount > 100){ 
 | 
                if(totalSendDataCount % 100 == 0){ 
 | 
                    RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount); 
 | 
                    System.out.println("已经发送" + totalSendDataCount + "条数据(心跳和上报)"); 
 | 
                } 
 | 
            }else{ 
 | 
                RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount); 
 | 
                System.out.println("已经发送" + totalSendDataCount + "条数据(心跳和上报)"); 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
  
 | 
    public static synchronized void clientOver(){ 
 | 
        totalOverClientCount++; 
 | 
        if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){ 
 | 
            RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount); 
 | 
            System.out.println("已有" + totalOverClientCount + "个RTU完成了任务"); 
 | 
        }else{ 
 | 
            if(totalRtuClientCount > 100) { 
 | 
                if (totalOverClientCount % 100 == 0) { 
 | 
                    RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount); 
 | 
                    System.out.println("已有" + totalOverClientCount + "个RTU完成了任务"); 
 | 
                } 
 | 
            }else{ 
 | 
                RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount); 
 | 
                System.out.println("已有" + totalOverClientCount + "个RTU完成了任务"); 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
} 
 |