From f8b2e59a82702a790c383a8ecd90c708c76e2488 Mon Sep 17 00:00:00 2001 From: liurunyu <lry9898@163.com> Date: 星期四, 05 六月 2025 17:51:59 +0800 Subject: [PATCH] 增量开发MQTT协议、功能模块,上下行命令(消息)等 --- pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java | 156 +++++++++++++++++++++++++++++++++++++++++++++++----- 1 files changed, 141 insertions(+), 15 deletions(-) diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java index f986e33..b7917f5 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java @@ -3,11 +3,16 @@ import java.util.ArrayList; import java.util.List; +import com.dy.common.util.ConfigProperties; +import com.dy.common.util.IPUtils; import com.dy.rtuMw.server.*; +import com.dy.rtuMw.server.mqtt.MqttUnit; +import com.dy.rtuMw.server.mqtt.MqttUnitConfigVo; +import com.dy.rtuMw.server.msCenter.MsCenterConfigVo; +import com.dy.rtuMw.server.msCenter.MsCenterUnit; import com.dy.rtuMw.server.rtuData.RtuDataUnit; import com.dy.rtuMw.server.rtuData.RtuDataUnitConfigVo; -import com.dy.rtuMw.server.tasks.FromRtuComResultConstantTask; -import com.dy.rtuMw.server.tasks.FromRtuDataConstantTask; +import com.dy.rtuMw.server.tasks.*; import com.dy.common.mw.UnitInterface; import com.dy.common.mw.channel.tcp.TcpConfigVo; import com.dy.common.mw.channel.tcp.TcpUnit; @@ -17,14 +22,14 @@ import com.dy.common.mw.protocol.ProtocolUnit; import com.dy.common.mw.support.SupportUnit; import com.dy.common.mw.support.SupportUnitConfigVo; -import com.dy.rtuMw.server.tasks.SendMsConstantTask; -import com.dy.rtuMw.server.tasks.ToRtuConstantTask; import com.dy.rtuMw.resource.ResourceUnit; import com.dy.rtuMw.resource.ResourceUnitConfigVo; import com.dy.common.springUtil.SpringContextUtil; import com.dy.common.util.ConfigXml4Springboot; import com.dy.common.util.IDLongGenerator; +import com.dy.rtuMw.server.upgrade.UpgradeUnit; +import com.dy.rtuMw.server.upgrade.UpgradeUnitConfigVo; import org.jdom2.Document; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -38,6 +43,7 @@ private Document doc = null ; private boolean showStartInfo = false ; + private String orgTag ; private String RmiSvUrl ; private String TcpSvUrl ; @Value("${server.port}") @@ -81,7 +87,7 @@ boolean running = false ; long start = System.currentTimeMillis() ; try { - //ConfigProperties.init(this.getClass().getResourceAsStream("/config/config.properties"), false); + ConfigProperties.init(this.getClass().getResourceAsStream("/config.properties"), false); this.conf = new ConfigXml4Springboot() ; this.doc = this.conf.createDom(this.resourceLoader, "config.xml") ; @@ -111,7 +117,7 @@ System.out.println("@@@@@@@@@@@@@@@@@@@@@# #@@@@@@@@@@@@@@@@O") ; System.out.println("@@@@@@@@@@@@@@@@@@@@@@@ &@@@@@@@@@@@@@@") ; System.out.println("@@@@@@$ $@@@@@@@@@& O@@@@@@@@@@@#") ; - System.out.println("@@@@@@$ @@@@@@@@@ @@@@@@@@@& " + svName + "RtuMw 1.0.00" ) ; + System.out.println("@@@@@@$ @@@@@@@@@ @@@@@@@@@& " + this.orgTag + svName + "RtuMw 1.0.00" ) ; if(this.HttpSvPath != null && this.HttpSvPort != null){ System.out.println("@@@@@@$ O@@@@@@@@@ &@@@@@@@@ HttpSv [ip]:" + this.HttpSvPort + this.HttpSvPath) ; }else{ @@ -128,7 +134,7 @@ }else{ System.out.println("@@@@@@$ #@@@@@@@@@$ &@@@@@@@@" ) ; } - System.out.println("@@@@@@@@@@@@@@@@@@@@@@# &@@@@@@@@ Runing in standalone mode" ) ; + System.out.println("@@@@@@@@@@@@@@@@@@@@@@# &@@@@@@@@ Running in standalone mode" ) ; System.out.println("@@@@@@@@@@@@@@@@@@@@@& &@@@@@@@@ Startup in " + (System.currentTimeMillis() - start) + " MS" ) ; System.out.println("@@@@@@@@@@@@@@@@@@@# &@@@@@@@@ " + company) ; System.out.println("@@@@@@@@@@@@@@@@#O &@@@@@@@@") ; @@ -145,9 +151,10 @@ /////////////// //鍩烘湰閰嶇疆 ServerProperties.orgTag = this.conf.getSetAttrTxt(this.doc, "config.base", "orgTag", null, false, null) ; - if(ServerProperties.orgTag==null || ServerProperties.orgTag.trim().equals("")){ + if(ServerProperties.orgTag == null || ServerProperties.orgTag.trim().equals("")){ throw new Exception("鏈烘瀯tag涓嶈兘涓虹┖") ; } + this.orgTag = ServerProperties.orgTag ; ServerProperties.isLowPower = conf.getSetAttrBoolean(doc, "config.base", "isLowPower", null, null) ; if(ServerProperties.isLowPower == null){ ServerProperties.isLowPower = false ; @@ -179,10 +186,12 @@ ServerProperties.downComandMaxResendTimes = conf.getSetAttrPlusInt(doc, "config.base", "downComandMaxResendTimes", null, 1, 5, null).byteValue() ; //閽堝涓�涓猂TU锛屼笅鍙戝懡浠ょ殑鏃堕棿闂撮殧 ServerProperties.commandSendInterval = conf.getSetAttrPlusInt(doc, "config.base", "commandSendInterval", null, 1, 40, null) * 1000L ; + //閽堝涓�涓猂TU锛屼笅鍙戝揩閫熷懡浠ょ殑鏃堕棿闂撮殧 + ServerProperties.fastCommandSendInterval = conf.getSetAttrPlusInt(doc, "config.base", "fastCommandSendInterval", null, 1, 40000, null) * 1L ; //鍛戒护宸茬粡鍙戦�佽揪鏈�澶ф鏁帮紝浠嶆湭鏀跺埌鍛戒护缁撴灉锛岄渶瑕佸湪缂撳瓨缁х画绛夊緟锛屽叾绛夊緟鏈�澶ф椂闀� - ServerProperties.cachWaitResultTimeout = conf.getSetAttrPlusInt(doc, "config.base", "cachWaitResultTimeout", null, 10, 360, null) * 1000L ; + ServerProperties.cacheWaitResultTimeout = conf.getSetAttrPlusInt(doc, "config.base", "cacheWaitResultTimeout", null, 10, 360, null) * 1000L ; //涓嶅湪绾跨紦瀛樼殑鍛戒护鏈�澶х紦瀛樻椂闀� - ServerProperties.offLineCachTimeout = conf.getSetAttrPlusInt(doc, "config.base", "offLineCachTimeout", null, 15, 172800, null) * 1000L ; + ServerProperties.offLineCacheTimeout = conf.getSetAttrPlusInt(doc, "config.base", "offLineCacheTimeout", null, 15, 172800, null) * 1000L ; //TCP涓婅鏁版嵁鏃跺埢缂撳瓨鏃堕暱锛屽綋杈惧埌鏃堕暱鏃讹紝TCP涓婅鏁版嵁鏃跺埢琚竻绌猴紝閲囩敤TCP涓婅鏁版嵁鏃跺埢鐩殑鏄紝闃绘涓婃暟鎹悓鏃朵笅鍙戞暟鎹紝鍥犱负RTU澶勭悊涓嶈繃鏉� ServerProperties.lastUpDataTimeLive = conf.getSetAttrPlusInt(doc, "config.base", "lastUpDataTimeLive", null, 0, 5000, null) * 1L ; //鏁版嵁搴撴暟鎹甶d鐢熸垚鍣ㄧ殑id鍚庣紑锛�0鏄粯璁ょ殑鍚庣紑锛屼竴鑸瑆eb绯荤粺搴旂敤锛屾暟鎹腑闂翠欢id鍚庣紑澶т簬绛変簬1 @@ -203,6 +212,9 @@ //宸ヤ綔鎶ュお棰戠箒锛孨娆′笂鎶ュ鐞�1娆★紝鍙栧�艰寖鍥存槸1-100 ServerProperties.workReportDealOneByTimes = conf.getSetAttrPlusInt(doc, "config.base", "workReportDealOneByTimes", null, 1, 100, null) ; + + //瑙﹀彂鍙戦�侀拤閽夋姤璀︽秷鎭殑鍙栨按鍙f棩婕忔崯閲忕殑鏈�灏忓�硷紙鍖呮嫭浣嗛櫎0.0澶栵級 + ServerProperties.intakeAlarmLossMinValue = conf.getSetAttrPlusDouble(doc, "config.base", "intakeAlarmLossMinValue", null, 0.0, 1000000.0, null) ; //鏈夋姤璀﹀彂鐢熸椂锛屽悜閽夐拤鍙戦�佹秷鎭殑闂撮殧鏃堕暱锛堝垎閽燂級 ServerProperties.sendDingDingAlarmMsInterval = conf.getSetAttrPlusInt(doc, "config.base", "sendDingDingAlarmMsInterval", null, 1, 600, null) ; @@ -284,7 +296,7 @@ //RTU鏃ュ織鏂囦欢瀛樺偍鐩綍(鐩稿鐩綍) resVo.rtuLogDir = conf.getSetAttrTxt(doc, "config.resource", "rtuLogDir", null, false, null) ; //RTU鏃ュ織鏂囦欢鏈�澶у瓧鑺傛暟(KB) - resVo.rtuLogFileMaxSize = conf.getSetAttrPlusInt(doc, "config.resource", "rtuLogFileMaxSize", null, 100000, 2000000, null) ; + resVo.rtuLogFileMaxSize = conf.getSetAttrPlusInt(doc, "config.resource", "rtuLogFileMaxSize", null, 10, 2000000, null) ; //RTU鏃ュ織鏂囦欢鏈�澶ф枃浠舵暟 resVo.rtuLogFileMaxCount = conf.getSetAttrPlusInt(doc, "config.resource", "rtuLogFileMaxCount", null, 1, 10, null) ; @@ -327,6 +339,52 @@ } */ + + ///////////////// + //娑堟伅涓績妯″潡 + MsCenterConfigVo mscVo = new MsCenterConfigVo(); + mscVo.enable = conf.getSetAttrBoolean(doc, "config.msCenter", "enable", null, null) ; + mscVo.notifyMsInterval = conf.getSetAttrPlusInt(doc, "config.msCenter", "notifyInterval", null, 1, 600, null) * 1000L ; + mscVo.showStartInfo = showStartInfo ; + AdapterImp_MsCenterUnit mscAdapt = new AdapterImp_MsCenterUnit(); + mscAdapt.setConfig(mscVo); + MsCenterUnit mscUnit = MsCenterUnit.getInstance(); + mscUnit.setAdapter(mscAdapt); + mscUnit.start(obj -> { + }); + units.add(mscUnit) ; + + + ///////////////// + //RTU杩滅▼鍗囩骇妯″潡 + UpgradeUnitConfigVo ugVo = new UpgradeUnitConfigVo(); + ugVo.enable = conf.getSetAttrBoolean(doc, "config.upgrade", "enable", null, null) ; + if(ugVo.enable){ + ugVo.openNoUpgrade = conf.getSetAttrBoolean(doc, "config.upgrade", "openNoUpgrade", null, null) ; + ugVo.lastOpenMaxGoOn = conf.getSetAttrPlusInt(doc, "config.upgrade", "lastOpenMaxGoOn", null, 5, 360000, null); + ugVo.lastOpenMaxGoOn = ugVo.lastOpenMaxGoOn * 1000 ;//鍙樻垚姣 + ugVo.noOneRtuUpgradeMaxDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "noOneRtuUpgradeMaxDuration", null, 5, 360000, null); + ugVo.noOneRtuUpgradeMaxDuration = ugVo.noOneRtuUpgradeMaxDuration * 1000 ;//鍙樻垚姣 + ugVo.runningAndIdleDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "runningAndIdleDuration", null, 5, 360000, null); + ugVo.runningAndIdleDuration = ugVo.runningAndIdleDuration * 1000 ;//鍙樻垚姣 + ugVo.failTryTimes = conf.getSetAttrPlusInt(doc, "config.upgrade", "failTryTimes", null, 0, 100, null); + ugVo.ugMaxRtuAtOnce = conf.getSetAttrPlusInt(doc, "config.upgrade", "ugMaxRtuAtOnce", null, 0, 1000000, null); + ugVo.rtuOffLineWaitDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "rtuOffLineWaitDuration", null, 1, 3600000, null); + ugVo.rtuOffLineWaitDuration = ugVo.rtuOffLineWaitDuration * 1000;//鍙樻垚姣 + ugVo.notifyStateInterval = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyStateInterval", null, 1, 300, null); + ugVo.notifyStateInterval = ugVo.notifyStateInterval * 1000;//鍙樻垚姣 + ugVo.notifyTimesAfterOver = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyTimesAfterOver", null, 0, null, null); + ugVo.showStartInfo = showStartInfo ; + AdapterImp_UpgradeUnit ugAdap = new AdapterImp_UpgradeUnit(); + ugAdap.setConfig(ugVo); + UpgradeUnit ugUnit = UpgradeUnit.getInstance(); + ugUnit.setAdapter(ugAdap); + ugUnit.start(obj -> { + }); + units.add(ugUnit) ; + } + + ///////////////// //RTU涓婅鏁版嵁澶勭悊妯″潡锛堜换鍔℃爲锛� RtuDataUnitConfigVo rducVo = new RtuDataUnitConfigVo(); @@ -344,8 +402,7 @@ // /////////////// // 鏍稿績 CoreUnitConfigVo coreConfVo = new CoreUnitConfigVo(); - coreConfVo.sleepBigBusy = conf.getSetAttrPlusInt(doc, "config.core", "sleepBigBusy", null, 1, 200, null).longValue() ; - coreConfVo.sleepSmallBusy = conf.getSetAttrPlusInt(doc, "config.core", "sleepSmallBusy", null, 2, 1000, null).longValue(); + coreConfVo.coreInterval = conf.getSetAttrPlusInt(doc, "config.core", "coreInterval", null, 1, 200, null).longValue() ; coreConfVo.queueWarnSize = ServerProperties.cacheUpDownDataWarnCount ; coreConfVo.queueMaxSize = ServerProperties.cacheUpDownDataMaxCount ; coreConfVo.showStartInfo = showStartInfo ; @@ -353,9 +410,15 @@ coreAdap.setConfig(coreConfVo); CoreUnit coreUnit = CoreUnit.getInstance(); coreUnit.setAdapter(coreAdap); - CoreUnit.addConstantTask(new ToRtuConstantTask()); + CoreUnit.addConstantTask(new RtuDownConstantTask()); CoreUnit.addConstantTask(new FromRtuDataConstantTask()); CoreUnit.addConstantTask(new FromRtuComResultConstantTask()); + Boolean enableMq = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ; + if(enableMq != null && enableMq.booleanValue()){ + CoreUnit.addConstantTask(new MqttSubMessageConstantTask()); + CoreUnit.addConstantTask(new MqttPubMessageConstantTask()); + CoreUnit.addConstantTask(new MqttComResultConstantTask()); + } CoreUnit.addConstantTask(new SendMsConstantTask()); coreUnit.start(obj -> { }); @@ -379,7 +442,70 @@ }); TcpSvUrl = "[ip]:" + tcpVo.port ; units.add(tcpUnit) ; - } + } + + ///////////////// + //MQTT妯″潡 + MqttUnitConfigVo mqVo = new MqttUnitConfigVo(); + mqVo.enable = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ; + ServerProperties.mqttUnitEnable = mqVo.enable ; + if(mqVo.enable){ + mqVo.svIp = conf.getSetAttrTxt(doc, "config.mqtt", "svIp", null, true, null) ; + if(!IPUtils.ipValid(mqVo.svIp)){ + throw new Exception("config.mqtt.svIp閰嶇疆鐨処P涓嶅悎娉�") ; + } + mqVo.svPort = conf.getSetAttrPlusInt(doc, "config.mqtt", "svPort", null, 5, 360000, null); + if(mqVo.svPort < 0 || mqVo.svPort > 65535){ + throw new Exception("config.mqtt.svPort閰嶇疆鐨勭鍙d笉鍚堟硶") ; + } + mqVo.svUserName = conf.getSetAttrTxt(doc, "config.mqtt", "svUserName", null, true, null) ; + if(mqVo.svUserName == null || mqVo.svUserName.trim().equals("")){ + throw new Exception("config.mqtt.svUserName閰嶇疆鐨勭敤鎴峰悕涓嶅悎娉�") ; + }else{ + mqVo.svUserName = mqVo.svUserName.trim() ; + } + mqVo.svUserPassword = conf.getSetAttrTxt(doc, "config.mqtt", "svUserPassword", null, true, null) ; + if(mqVo.svUserPassword == null || mqVo.svUserPassword.trim().equals("")){ + throw new Exception("config.mqtt.svUserName閰嶇疆鐨勭敤鎴峰瘑鐮佷笉鍚堟硶") ; + }else{ + mqVo.svUserPassword = mqVo.svUserPassword.trim() ; + } + mqVo.poolMaxSize = conf.getSetAttrPlusInt(doc, "config.mqtt", "poolMaxSize", null, 5, 360000, null); + if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){ + throw new Exception("config.mqtt.poolMaxSize閰嶇疆鐨勮繛鎺ユ睜杩炴帴鏈�澶ф暟閲忎笉鍚堟硶") ; + } + String topicAndQos = conf.getSetAttrTxt(doc, "config.mqtt", "topicAndQos", null, true, null) ; + if(topicAndQos == null || topicAndQos.trim().equals("")){ + throw new Exception("config.mqtt.topicAndQos閰嶇疆鐨勪富棰樺強Qos涓嶅悎娉�") ; + }else{ + topicAndQos = topicAndQos.trim() ; + topicAndQos = topicAndQos.replaceAll("锛�", ","); + topicAndQos = topicAndQos.replaceAll("锛�", ";"); + String[] topicAndQosArr = topicAndQos.split(";") ; + mqVo.subTopics = new String[topicAndQosArr.length] ; + mqVo.topicsQos = new int[topicAndQosArr.length] ; + int index = 0 ; + for(String topicAndQosStr : topicAndQosArr){ + String[] tq = topicAndQosStr.split(",") ; + mqVo.subTopics[index] = tq[0].trim() ; + mqVo.topicsQos[index] = Integer.parseInt(tq[1].trim()) ; + index++ ; + } + } + mqVo.publishQos = conf.getSetAttrPlusInt(doc, "config.mqtt", "publishQos", null, 0, 3, null); + if(mqVo.publishQos < 0 || mqVo.publishQos > 3){ + throw new Exception("config.mqtt.publishQos閰嶇疆涓嶅悎娉�") ; + } + mqVo.showStartInfo = showStartInfo ; + AdapterImp_MqttUnit mqAdapt = new AdapterImp_MqttUnit(); + mqAdapt.setConfig(mqVo); + MqttUnit mqUnit = MqttUnit.getInstance(); + mqUnit.setAdapter(mqAdapt); + mqUnit.start(obj -> { + }); + units.add(mqUnit) ; + } + } catch (Exception e) { e.printStackTrace(); } -- Gitblit v1.8.0