liurunyu
2 天以前 f8b2e59a82702a790c383a8ecd90c708c76e2488
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
@@ -3,11 +3,16 @@
import java.util.ArrayList;
import java.util.List;
import com.dy.common.util.ConfigProperties;
import com.dy.common.util.IPUtils;
import com.dy.rtuMw.server.*;
import com.dy.rtuMw.server.mqtt.MqttUnit;
import com.dy.rtuMw.server.mqtt.MqttUnitConfigVo;
import com.dy.rtuMw.server.msCenter.MsCenterConfigVo;
import com.dy.rtuMw.server.msCenter.MsCenterUnit;
import com.dy.rtuMw.server.rtuData.RtuDataUnit;
import com.dy.rtuMw.server.rtuData.RtuDataUnitConfigVo;
import com.dy.rtuMw.server.tasks.FromRtuComResultConstantTask;
import com.dy.rtuMw.server.tasks.FromRtuDataConstantTask;
import com.dy.rtuMw.server.tasks.*;
import com.dy.common.mw.UnitInterface;
import com.dy.common.mw.channel.tcp.TcpConfigVo;
import com.dy.common.mw.channel.tcp.TcpUnit;
@@ -17,14 +22,14 @@
import com.dy.common.mw.protocol.ProtocolUnit;
import com.dy.common.mw.support.SupportUnit;
import com.dy.common.mw.support.SupportUnitConfigVo;
import com.dy.rtuMw.server.tasks.SendMsConstantTask;
import com.dy.rtuMw.server.tasks.ToRtuConstantTask;
import com.dy.rtuMw.resource.ResourceUnit;
import com.dy.rtuMw.resource.ResourceUnitConfigVo;
import com.dy.common.springUtil.SpringContextUtil;
import com.dy.common.util.ConfigXml4Springboot;
import com.dy.common.util.IDLongGenerator;
import com.dy.rtuMw.server.upgrade.UpgradeUnit;
import com.dy.rtuMw.server.upgrade.UpgradeUnitConfigVo;
import org.jdom2.Document;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@@ -38,6 +43,7 @@
   private Document doc = null ;
   private boolean showStartInfo = false ;
   
   private String orgTag ;
   private String RmiSvUrl ;
   private String TcpSvUrl ;
   @Value("${server.port}")
@@ -81,7 +87,7 @@
      boolean running = false ;
      long start = System.currentTimeMillis() ;
      try {
         //ConfigProperties.init(this.getClass().getResourceAsStream("/config/config.properties"), false);
         ConfigProperties.init(this.getClass().getResourceAsStream("/config.properties"), false);
         
         this.conf = new ConfigXml4Springboot() ;
         this.doc = this.conf.createDom(this.resourceLoader, "config.xml") ;
@@ -111,7 +117,7 @@
            System.out.println("@@@@@@@@@@@@@@@@@@@@@#   #@@@@@@@@@@@@@@@@O") ;    
            System.out.println("@@@@@@@@@@@@@@@@@@@@@@@   &@@@@@@@@@@@@@@") ;           
            System.out.println("@@@@@@$      $@@@@@@@@@&   O@@@@@@@@@@@#") ;        
            System.out.println("@@@@@@$        @@@@@@@@@     @@@@@@@@@&      " + svName + "RtuMw 1.0.00" ) ;
            System.out.println("@@@@@@$        @@@@@@@@@     @@@@@@@@@&      " + this.orgTag + svName + "RtuMw 1.0.00" ) ;
         if(this.HttpSvPath != null && this.HttpSvPort != null){
            System.out.println("@@@@@@$       O@@@@@@@@@     &@@@@@@@@       HttpSv [ip]:" + this.HttpSvPort + this.HttpSvPath) ;
         }else{
@@ -128,7 +134,7 @@
            }else{
                System.out.println("@@@@@@$      #@@@@@@@@@$     &@@@@@@@@" ) ;        
            }
            System.out.println("@@@@@@@@@@@@@@@@@@@@@@#      &@@@@@@@@       Runing in standalone mode" ) ;
            System.out.println("@@@@@@@@@@@@@@@@@@@@@@#      &@@@@@@@@       Running in standalone mode" ) ;
            System.out.println("@@@@@@@@@@@@@@@@@@@@@&       &@@@@@@@@       Startup in " + (System.currentTimeMillis() - start) + " MS" ) ;              
            System.out.println("@@@@@@@@@@@@@@@@@@@#         &@@@@@@@@       " + company) ;            
            System.out.println("@@@@@@@@@@@@@@@@#O           &@@@@@@@@") ;
@@ -145,9 +151,10 @@
         ///////////////
         //基本配置
         ServerProperties.orgTag = this.conf.getSetAttrTxt(this.doc, "config.base", "orgTag", null, false, null) ;
         if(ServerProperties.orgTag==null || ServerProperties.orgTag.trim().equals("")){
         if(ServerProperties.orgTag == null || ServerProperties.orgTag.trim().equals("")){
            throw new Exception("机构tag不能为空") ;
         }
         this.orgTag = ServerProperties.orgTag ;
         ServerProperties.isLowPower = conf.getSetAttrBoolean(doc, "config.base", "isLowPower", null, null) ; 
         if(ServerProperties.isLowPower == null){
            ServerProperties.isLowPower = false ;
@@ -179,10 +186,12 @@
         ServerProperties.downComandMaxResendTimes = conf.getSetAttrPlusInt(doc, "config.base", "downComandMaxResendTimes", null, 1, 5, null).byteValue() ;
         //针对一个RTU,下发命令的时间间隔
         ServerProperties.commandSendInterval = conf.getSetAttrPlusInt(doc, "config.base", "commandSendInterval", null, 1, 40, null) * 1000L ;
         //针对一个RTU,下发快速命令的时间间隔
         ServerProperties.fastCommandSendInterval = conf.getSetAttrPlusInt(doc, "config.base", "fastCommandSendInterval", null, 1, 40000, null) * 1L ;
         //命令已经发送达最大次数,仍未收到命令结果,需要在缓存继续等待,其等待最大时长
         ServerProperties.cachWaitResultTimeout = conf.getSetAttrPlusInt(doc, "config.base", "cachWaitResultTimeout", null, 10, 360, null) * 1000L ;
         ServerProperties.cacheWaitResultTimeout = conf.getSetAttrPlusInt(doc, "config.base", "cacheWaitResultTimeout", null, 10, 360, null) * 1000L ;
         //不在线缓存的命令最大缓存时长
         ServerProperties.offLineCachTimeout = conf.getSetAttrPlusInt(doc, "config.base", "offLineCachTimeout", null, 15, 172800, null) * 1000L ;
         ServerProperties.offLineCacheTimeout = conf.getSetAttrPlusInt(doc, "config.base", "offLineCacheTimeout", null, 15, 172800, null) * 1000L ;
         //TCP上行数据时刻缓存时长,当达到时长时,TCP上行数据时刻被清空,采用TCP上行数据时刻目的是,阻止上数据同时下发数据,因为RTU处理不过来
         ServerProperties.lastUpDataTimeLive = conf.getSetAttrPlusInt(doc, "config.base", "lastUpDataTimeLive", null, 0, 5000, null) * 1L ;
         //数据库数据id生成器的id后缀,0是默认的后缀,一般web系统应用,数据中间件id后缀大于等于1
@@ -203,6 +212,9 @@
         //工作报太频繁,N次上报处理1次,取值范围是1-100
         ServerProperties.workReportDealOneByTimes =  conf.getSetAttrPlusInt(doc, "config.base", "workReportDealOneByTimes", null, 1, 100, null) ;
         //触发发送钉钉报警消息的取水口日漏损量的最小值(包括但除0.0外)
         ServerProperties.intakeAlarmLossMinValue =  conf.getSetAttrPlusDouble(doc, "config.base", "intakeAlarmLossMinValue", null, 0.0, 1000000.0, null) ;
         //有报警发生时,向钉钉发送消息的间隔时长(分钟)
         ServerProperties.sendDingDingAlarmMsInterval =  conf.getSetAttrPlusInt(doc, "config.base", "sendDingDingAlarmMsInterval", null, 1, 600, null) ;
@@ -284,7 +296,7 @@
         //RTU日志文件存储目录(相对目录)
         resVo.rtuLogDir = conf.getSetAttrTxt(doc, "config.resource", "rtuLogDir", null, false, null) ;
         //RTU日志文件最大字节数(KB)
         resVo.rtuLogFileMaxSize = conf.getSetAttrPlusInt(doc, "config.resource", "rtuLogFileMaxSize", null, 100000, 2000000, null) ;
         resVo.rtuLogFileMaxSize = conf.getSetAttrPlusInt(doc, "config.resource", "rtuLogFileMaxSize", null, 10, 2000000, null) ;
         //RTU日志文件最大文件数
         resVo.rtuLogFileMaxCount = conf.getSetAttrPlusInt(doc, "config.resource", "rtuLogFileMaxCount", null, 1, 10, null) ; 
         
@@ -327,8 +339,54 @@
         }
         */
         /////////////////
         //RTU上行数据处理模块
         //消息中心模块
         MsCenterConfigVo mscVo = new MsCenterConfigVo();
         mscVo.enable = conf.getSetAttrBoolean(doc, "config.msCenter", "enable", null, null) ;
         mscVo.notifyMsInterval = conf.getSetAttrPlusInt(doc, "config.msCenter", "notifyInterval", null, 1, 600, null) * 1000L ;
         mscVo.showStartInfo = showStartInfo ;
         AdapterImp_MsCenterUnit mscAdapt = new AdapterImp_MsCenterUnit();
         mscAdapt.setConfig(mscVo);
         MsCenterUnit mscUnit = MsCenterUnit.getInstance();
         mscUnit.setAdapter(mscAdapt);
         mscUnit.start(obj -> {
         });
         units.add(mscUnit) ;
         /////////////////
         //RTU远程升级模块
         UpgradeUnitConfigVo ugVo = new UpgradeUnitConfigVo();
         ugVo.enable = conf.getSetAttrBoolean(doc, "config.upgrade", "enable", null, null) ;
         if(ugVo.enable){
            ugVo.openNoUpgrade = conf.getSetAttrBoolean(doc, "config.upgrade", "openNoUpgrade", null, null) ;
            ugVo.lastOpenMaxGoOn = conf.getSetAttrPlusInt(doc, "config.upgrade", "lastOpenMaxGoOn", null, 5, 360000, null);
            ugVo.lastOpenMaxGoOn = ugVo.lastOpenMaxGoOn * 1000 ;//变成毫秒
            ugVo.noOneRtuUpgradeMaxDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "noOneRtuUpgradeMaxDuration", null, 5, 360000, null);
            ugVo.noOneRtuUpgradeMaxDuration = ugVo.noOneRtuUpgradeMaxDuration * 1000 ;//变成毫秒
            ugVo.runningAndIdleDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "runningAndIdleDuration", null, 5, 360000, null);
            ugVo.runningAndIdleDuration = ugVo.runningAndIdleDuration * 1000 ;//变成毫秒
            ugVo.failTryTimes = conf.getSetAttrPlusInt(doc, "config.upgrade", "failTryTimes", null, 0, 100, null);
            ugVo.ugMaxRtuAtOnce = conf.getSetAttrPlusInt(doc, "config.upgrade", "ugMaxRtuAtOnce", null, 0, 1000000, null);
            ugVo.rtuOffLineWaitDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "rtuOffLineWaitDuration", null, 1, 3600000, null);
            ugVo.rtuOffLineWaitDuration = ugVo.rtuOffLineWaitDuration * 1000;//变成毫秒
            ugVo.notifyStateInterval = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyStateInterval", null, 1, 300, null);
            ugVo.notifyStateInterval = ugVo.notifyStateInterval * 1000;//变成毫秒
            ugVo.notifyTimesAfterOver = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyTimesAfterOver", null, 0, null, null);
            ugVo.showStartInfo = showStartInfo ;
            AdapterImp_UpgradeUnit ugAdap = new AdapterImp_UpgradeUnit();
            ugAdap.setConfig(ugVo);
            UpgradeUnit ugUnit = UpgradeUnit.getInstance();
            ugUnit.setAdapter(ugAdap);
            ugUnit.start(obj -> {
            });
            units.add(ugUnit) ;
         }
         /////////////////
         //RTU上行数据处理模块(任务树)
         RtuDataUnitConfigVo rducVo = new RtuDataUnitConfigVo();
         rducVo.resourceLoader = this.resourceLoader ;
         AdapterImp_RtuDataUnit rducAdap = new AdapterImp_RtuDataUnit();
@@ -344,8 +402,7 @@
         // ///////////////
         // 核心
         CoreUnitConfigVo coreConfVo = new CoreUnitConfigVo();
         coreConfVo.sleepBigBusy = conf.getSetAttrPlusInt(doc, "config.core", "sleepBigBusy", null, 1, 200, null).longValue() ;
         coreConfVo.sleepSmallBusy = conf.getSetAttrPlusInt(doc, "config.core", "sleepSmallBusy", null, 2, 1000, null).longValue();
         coreConfVo.coreInterval = conf.getSetAttrPlusInt(doc, "config.core", "coreInterval", null, 1, 200, null).longValue() ;
         coreConfVo.queueWarnSize = ServerProperties.cacheUpDownDataWarnCount ;
         coreConfVo.queueMaxSize = ServerProperties.cacheUpDownDataMaxCount ;
         coreConfVo.showStartInfo = showStartInfo ;
@@ -353,9 +410,15 @@
         coreAdap.setConfig(coreConfVo);
         CoreUnit coreUnit = CoreUnit.getInstance();
         coreUnit.setAdapter(coreAdap);
         CoreUnit.addConstantTask(new ToRtuConstantTask());
         CoreUnit.addConstantTask(new RtuDownConstantTask());
         CoreUnit.addConstantTask(new FromRtuDataConstantTask());
         CoreUnit.addConstantTask(new FromRtuComResultConstantTask());
         Boolean enableMq = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ;
         if(enableMq != null && enableMq.booleanValue()){
            CoreUnit.addConstantTask(new MqttSubMessageConstantTask());
            CoreUnit.addConstantTask(new MqttPubMessageConstantTask());
            CoreUnit.addConstantTask(new MqttComResultConstantTask());
         }
         CoreUnit.addConstantTask(new SendMsConstantTask());
         coreUnit.start(obj -> {
         });
@@ -379,7 +442,70 @@
            });
            TcpSvUrl = "[ip]:" + tcpVo.port ;
            units.add(tcpUnit) ;
         }
         }
         /////////////////
         //MQTT模块
         MqttUnitConfigVo mqVo = new MqttUnitConfigVo();
         mqVo.enable = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ;
         ServerProperties.mqttUnitEnable = mqVo.enable ;
         if(mqVo.enable){
            mqVo.svIp = conf.getSetAttrTxt(doc, "config.mqtt", "svIp", null, true, null) ;
            if(!IPUtils.ipValid(mqVo.svIp)){
               throw new Exception("config.mqtt.svIp配置的IP不合法") ;
            }
            mqVo.svPort = conf.getSetAttrPlusInt(doc, "config.mqtt", "svPort", null, 5, 360000, null);
            if(mqVo.svPort < 0 || mqVo.svPort > 65535){
               throw new Exception("config.mqtt.svPort配置的端口不合法") ;
            }
            mqVo.svUserName = conf.getSetAttrTxt(doc, "config.mqtt", "svUserName", null, true, null) ;
            if(mqVo.svUserName == null || mqVo.svUserName.trim().equals("")){
               throw new Exception("config.mqtt.svUserName配置的用户名不合法") ;
            }else{
               mqVo.svUserName = mqVo.svUserName.trim() ;
            }
            mqVo.svUserPassword = conf.getSetAttrTxt(doc, "config.mqtt", "svUserPassword", null, true, null) ;
            if(mqVo.svUserPassword == null || mqVo.svUserPassword.trim().equals("")){
               throw new Exception("config.mqtt.svUserName配置的用户密码不合法") ;
            }else{
               mqVo.svUserPassword = mqVo.svUserPassword.trim() ;
            }
            mqVo.poolMaxSize = conf.getSetAttrPlusInt(doc, "config.mqtt", "poolMaxSize", null, 5, 360000, null);
            if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){
               throw new Exception("config.mqtt.poolMaxSize配置的连接池连接最大数量不合法") ;
            }
            String topicAndQos = conf.getSetAttrTxt(doc, "config.mqtt", "topicAndQos", null, true, null) ;
            if(topicAndQos == null || topicAndQos.trim().equals("")){
               throw new Exception("config.mqtt.topicAndQos配置的主题及Qos不合法") ;
            }else{
               topicAndQos = topicAndQos.trim() ;
               topicAndQos = topicAndQos.replaceAll(",", ",");
               topicAndQos = topicAndQos.replaceAll(";", ";");
               String[] topicAndQosArr = topicAndQos.split(";") ;
               mqVo.subTopics = new String[topicAndQosArr.length] ;
               mqVo.topicsQos = new int[topicAndQosArr.length] ;
               int index = 0 ;
               for(String topicAndQosStr : topicAndQosArr){
                  String[] tq = topicAndQosStr.split(",") ;
                  mqVo.subTopics[index] = tq[0].trim() ;
                  mqVo.topicsQos[index] = Integer.parseInt(tq[1].trim()) ;
                  index++ ;
               }
            }
            mqVo.publishQos = conf.getSetAttrPlusInt(doc, "config.mqtt", "publishQos", null, 0, 3, null);
            if(mqVo.publishQos < 0 || mqVo.publishQos > 3){
               throw new Exception("config.mqtt.publishQos配置不合法") ;
            }
            mqVo.showStartInfo = showStartInfo ;
            AdapterImp_MqttUnit mqAdapt = new AdapterImp_MqttUnit();
            mqAdapt.setConfig(mqVo);
            MqttUnit mqUnit = MqttUnit.getInstance();
            mqUnit.setAdapter(mqAdapt);
            mqUnit.start(obj -> {
            });
            units.add(mqUnit) ;
         }
      } catch (Exception e) {
         e.printStackTrace();
      }