package com.dy.testClient.tcpClient;
|
|
import com.dy.common.mw.UnitAdapterInterface;
|
import com.dy.common.mw.UnitInterface;
|
import com.dy.common.mw.UnitStartedCallbackInterface;
|
import com.dy.common.threadPool.ThreadPool;
|
import com.dy.common.threadPool.TreadPoolFactory;
|
import com.dy.common.util.Callback;
|
import com.dy.testClient.ServerProperties;
|
import com.dy.testClient.rmiClient.RmiClUnit;
|
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.Logger;
|
import org.apache.mina.core.session.IoSession;
|
|
import java.util.Collection;
|
import java.util.HashMap;
|
import java.util.Map;
|
|
public class TcpClUnit implements UnitInterface {
|
|
private static final Logger log = LogManager.getLogger(TcpClUnit.class) ;
|
|
private static TcpClUnit instance = new TcpClUnit() ;
|
|
public static TcpClUnitAdapter adapter ;
|
public static TcpClUnitConfigVo confVo ;
|
|
private static ThreadPool.Pool pool ;
|
private static Map<String, MyThreadJob> jobMap = new HashMap<>() ;
|
|
private static Integer totalRtuClientCount = 0;
|
private static Integer totalSendDataCount = 0;
|
private static Integer totalOverClientCount = 0;
|
|
private static Long startTime = 0L ;
|
|
private TcpClUnit(){} ;
|
|
public static TcpClUnit getInstance(){
|
return instance ;
|
}
|
|
@Override
|
public void setAdapter(UnitAdapterInterface adapter) throws Exception {
|
if(adapter == null){
|
throw new Exception("Tcp Client模块适配器对象不能为空!") ;
|
}
|
TcpClUnit.adapter = (TcpClUnitAdapter)adapter ;
|
TcpClUnit.confVo = TcpClUnit.adapter.getConfig() ;
|
if(TcpClUnit.confVo == null){
|
throw new Exception("Tcp Client模块配置对象不能为空!") ;
|
}
|
}
|
|
@Override
|
public void start(UnitStartedCallbackInterface callback) throws Exception {
|
pool = TreadPoolFactory.getThreadPoolLong() ;
|
System.out.println("Tcp Client模块成功启动");
|
this.doStart();
|
callback.call(null) ;
|
}
|
|
@Override
|
public void stop(UnitStartedCallbackInterface callback) throws Exception {
|
callback.call(null);
|
}
|
|
private void doStart(){
|
new Thread(new Runnable(){
|
@Override
|
public void run() {
|
try {
|
while(true){
|
if(!ServerProperties.startWork){
|
Thread.sleep(100L);
|
}else{
|
try{
|
startTime = System.currentTimeMillis() ;
|
for(Long addr = ServerProperties.rtuAddrStart; addr <= ServerProperties.rtuAddrEnd; addr++){
|
totalRtuClientCount++ ;
|
createImitate(addr) ;
|
}
|
log.info("共模拟了" + totalRtuClientCount + "台RTU");
|
|
Collection<MyThreadJob> collection = jobMap.values() ;
|
int connectedCount = 0 ;
|
for(MyThreadJob job : collection){
|
connectServer(job) ;
|
connectedCount++ ;
|
log.info("当前建立与通信中间件连接的RTU数量为:" + connectedCount);
|
}
|
log.info("所有RTU已与通信中间件建立连接");
|
|
while (true){
|
int noConnectedCount = checkConnected() ;
|
if(noConnectedCount > 0){
|
log.info("等待" + noConnectedCount + "台RTU连接网络");
|
Thread.sleep(100L);
|
}else{
|
break ;
|
}
|
}
|
|
startJob() ;
|
|
while(true){
|
if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
|
Long seconds = (System.currentTimeMillis() - startTime)/1000 ;
|
RmiClUnit.getInstance().allOver(seconds) ;
|
log.info("共用时" + seconds + "秒");
|
break ;
|
}else{
|
Thread.sleep(100L);
|
}
|
}
|
}catch (Exception e){
|
e.printStackTrace();
|
}finally {
|
break ;
|
}
|
}
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
}).start();
|
}
|
|
/**
|
* 创建RTU模拟MyThreadJob
|
* @param rtuAddr rtu地址
|
*/
|
private void createImitate(Long rtuAddr){
|
jobMap.put("" + rtuAddr, new MyThreadJob("" + rtuAddr, ServerProperties.tcpServerIp, ServerProperties.tcpServerPort)) ;
|
}
|
|
|
private void connectServer(MyThreadJob job){
|
if(job.session == null){
|
try{
|
new TcpConnect().createSession(job.rtuAddr,
|
job,
|
job.serverIp,
|
job.serverPort,
|
job.connectTimeout,
|
new TcpHandler(),
|
new Callback() {
|
@Override
|
public void call(Object obj) {
|
if(obj == null){
|
log.error("创建网络会话返回为null");
|
}else{
|
job.session = (IoSession)obj ;
|
}
|
}
|
@Override
|
public void call(Object... objs) {
|
}
|
@Override
|
public void exception(Exception e) {
|
}
|
}) ;
|
}catch (Exception e){
|
job.exceptionOnConnect = true ;
|
e.printStackTrace();
|
}
|
}
|
}
|
|
private int checkConnected(){
|
int noConnectedCount = 0 ;
|
Collection<MyThreadJob> collection = jobMap.values() ;
|
for(MyThreadJob job : collection){
|
if(job.session == null && !job.exceptionOnConnect){
|
noConnectedCount++ ;
|
}
|
}
|
return noConnectedCount;
|
}
|
|
private void startJob(){
|
new Thread(() -> {
|
try {
|
int notOverCount;
|
while(true){
|
notOverCount = 0 ;
|
Collection<MyThreadJob> collection = jobMap.values() ;
|
for(MyThreadJob job : collection){
|
if(!job.isOver){
|
notOverCount++ ;
|
pool.putJob(job);
|
}
|
}
|
if(notOverCount > 0){
|
log.info("当前还有" + notOverCount + "台RTU未完成任务");
|
Thread.sleep(ServerProperties.sendInterval * 1000);
|
}else{
|
break ;
|
}
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}).start();
|
}
|
|
|
public static synchronized void clientSendData(){
|
totalSendDataCount++;
|
if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
|
RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
|
System.out.println("已经发送" + totalSendDataCount + "条数据");
|
}else{
|
if(totalRtuClientCount > 100){
|
if(totalSendDataCount % 100 == 0){
|
RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
|
System.out.println("已经发送" + totalSendDataCount + "条数据");
|
}
|
}else{
|
RmiClUnit.getInstance().reportHadReportCount(totalSendDataCount);
|
System.out.println("已经发送" + totalSendDataCount + "条数据");
|
}
|
}
|
}
|
|
public static synchronized void clientOver(){
|
totalOverClientCount++;
|
if(totalOverClientCount.longValue() >= totalRtuClientCount.longValue()){
|
RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
|
System.out.println("已有" + totalOverClientCount + "个RTU完成了任务");
|
}else{
|
if(totalRtuClientCount > 100) {
|
if (totalOverClientCount % 100 == 0) {
|
RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
|
System.out.println("已有" + totalOverClientCount + "个RTU完成了任务");
|
}
|
}else{
|
RmiClUnit.getInstance().reportHadReportOver(totalOverClientCount);
|
System.out.println("已有" + totalOverClientCount + "个RTU完成了任务");
|
}
|
}
|
}
|
}
|