package com.dy.testClient.tcpClient; import com.dy.common.mw.UnitAdapterInterface; import com.dy.common.mw.UnitInterface; import com.dy.common.mw.UnitCallbackInterface; import com.dy.common.threadPool.ThreadPool; import com.dy.common.threadPool.TreadPoolFactory; import com.dy.common.util.Callback; import com.dy.testClient.ServerProperties; import com.dy.testClient.rmiClient.RmiClUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.mina.core.session.IoSession; import java.util.Collection; import java.util.HashMap; import java.util.Map; public class TcpClUnit implements UnitInterface { private static final Logger log = LogManager.getLogger(TcpClUnit.class) ; private static TcpClUnit instance = new TcpClUnit() ; public static TcpClUnitAdapter adapter ; public static TcpClUnitConfigVo confVo ; private static ThreadPool.Pool pool ; private static Map jobMap = new HashMap<>() ; private static Integer totalRtuClientCount = 0; private static Integer totalSendDataCount = 0; private static Integer totalOverClientCount = 0; private static Long startTime = 0L ; private TcpClUnit(){} ; public static TcpClUnit getInstance(){ return instance ; } @Override public void setAdapter(UnitAdapterInterface adapter) throws Exception { if(adapter == null){ throw new Exception("Tcp Client模块适配器对象不能为空!") ; } TcpClUnit.adapter = (TcpClUnitAdapter)adapter ; TcpClUnit.confVo = TcpClUnit.adapter.getConfig() ; if(TcpClUnit.confVo == null){ throw new Exception("Tcp Client模块配置对象不能为空!") ; } } @Override public void start(UnitCallbackInterface callback) throws Exception { pool = TreadPoolFactory.getThreadPoolLong() ; System.out.println("Tcp Client模块成功启动"); this.doStart(); callback.call(null) ; } @Override public void stop(UnitCallbackInterface callback) throws Exception { callback.call(null); } private void doStart(){ new Thread(new Runnable(){ @Override public void run() { try { while(true){ if(!ServerProperties.startTcpConnectWork){ Thread.sleep(100L); }else{ try{ startTime = System.currentTimeMillis() ; for(Long addr = ServerProperties.rtuAddrStart; addr <= ServerProperties.rtuAddrEnd; addr++){ totalRtuClientCount++ ; createImitate(addr) ; } log.info("共模拟了" + totalRtuClientCount + "台RTU"); Collection collection = jobMap.values() ; int connectedCount = 0 ; for(MyThreadJob job : collection){ connectServer(job) ; connectedCount++ ; log.info("当前建立与通信中间件连接的RTU数量为:" + connectedCount); } log.info("所有RTU已与通信中间件建立连接"); while (true){ int noConnectedCount = checkConnected() ; if(noConnectedCount > 0){ log.info("等待" + noConnectedCount + "台RTU连接网络"); Thread.sleep(100L); }else{ break ; } } while (true){ if(!ServerProperties.startRtuReportWork){ Thread.sleep(100L); }else{ startJob() ; break ; } } while(true){ if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){ Long seconds = (System.currentTimeMillis() - startTime)/1000 ; RmiClUnit.getInstance().allOver(seconds) ; log.info("共用时" + seconds + "秒"); break ; }else{ Thread.sleep(100L); } } }catch (Exception e){ e.printStackTrace(); }finally { break ; } } } } catch (Exception e) { e.printStackTrace(); } } }).start(); } /** * 创建RTU模拟MyThreadJob * @param rtuAddr rtu地址 */ private void createImitate(Long rtuAddr){ jobMap.put("" + rtuAddr, new MyThreadJob("" + rtuAddr, ServerProperties.tcpServerIp, ServerProperties.tcpServerPort)) ; } private void connectServer(MyThreadJob job){ if(job.session == null){ try{ new TcpConnect().createSession(job.rtuAddr, job, job.serverIp, job.serverPort, job.connectTimeout, new TcpHandler(), new Callback() { @Override public void call(Object obj) { if(obj == null){ log.error("创建网络会话返回为null"); }else{ job.session = (IoSession)obj ; } } @Override public void call(Object... objs) { } @Override public void exception(Exception e) { } }) ; }catch (Exception e){ job.exceptionOnConnect = true ; e.printStackTrace(); } } } private int checkConnected(){ int noConnectedCount = 0 ; Collection collection = jobMap.values() ; for(MyThreadJob job : collection){ if(job.session == null && !job.exceptionOnConnect){ noConnectedCount++ ; } } return noConnectedCount; } private void startJob(){ new Thread(() -> { try { int notOverCount; while(true){ notOverCount = 0 ; Collection collection = jobMap.values() ; for(MyThreadJob job : collection){ if(!job.isOver){ notOverCount++ ; pool.putJob(job); } } if(notOverCount > 0){ log.info("当前还有" + notOverCount + "台RTU未完成任务"); Thread.sleep(ServerProperties.sendInterval * 1000); }else{ break ; } } } catch (Exception e) { e.printStackTrace(); } }).start(); } public static synchronized void clientSendData(){ totalSendDataCount++; if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){ RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount); System.out.println("已经发送" + totalSendDataCount + "条数据(心跳和上报)"); }else{ if(totalRtuClientCount > 100){ if(totalSendDataCount % 100 == 0){ RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount); System.out.println("已经发送" + totalSendDataCount + "条数据(心跳和上报)"); } }else{ RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount); System.out.println("已经发送" + totalSendDataCount + "条数据(心跳和上报)"); } } } public static synchronized void clientOver(){ totalOverClientCount++; if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){ RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount); System.out.println("已有" + totalOverClientCount + "个RTU完成了任务"); }else{ if(totalRtuClientCount > 100) { if (totalOverClientCount % 100 == 0) { RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount); System.out.println("已有" + totalOverClientCount + "个RTU完成了任务"); } }else{ RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount); System.out.println("已有" + totalOverClientCount + "个RTU完成了任务"); } } } }