From 21c080aa9da3acd53e014e8f917b50a48cb791cb Mon Sep 17 00:00:00 2001 From: liurunyu <lry9898@163.com> Date: 星期三, 11 六月 2025 13:44:04 +0800 Subject: [PATCH] 进行ApiFox发送内部命令测试,MQTTX发布气象数据测试,修改测试中发现的bug,修改不完善的地方。 --- pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java | 35 +++++--- pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java | 3 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java | 12 +++ pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java | 24 +++--- pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java | 2 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java | 15 +++ pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java | 2 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java | 1 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml | 2 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java | 4 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java | 4 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java | 4 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java | 2 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java | 21 +++++ pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java | 2 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java | 30 ++++-- pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java | 16 ++-- pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java | 4 + 18 files changed, 122 insertions(+), 61 deletions(-) diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java index 1d6d60f..23ec867 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java @@ -13,8 +13,8 @@ private final GenericObjectPool<MqttClient> pool; - public MqttClientPool(String broker, String username, String password, int maxConnections) { - MqttClientPooledObjectFactory factory = new MqttClientPooledObjectFactory(broker, username, password); + public MqttClientPool(String broker, String username, String password, int maxConnections, boolean useMemoryPersistence) { + MqttClientPooledObjectFactory factory = new MqttClientPooledObjectFactory(broker, username, password, useMemoryPersistence); GenericObjectPoolConfig<MqttClient> config = new GenericObjectPoolConfig<>(); config.setMaxTotal(maxConnections); config.setMaxIdle(maxConnections); diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java index bd2eb0f..86fc682 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java @@ -5,6 +5,7 @@ import org.apache.commons.pool2.impl.DefaultPooledObject; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * @Author: liurunyu @@ -16,18 +17,26 @@ private final String broker; private final String username; private final String password; + private final Boolean useMemoryPersistence; - public MqttClientPooledObjectFactory(String broker, String username, String password) { + public MqttClientPooledObjectFactory(String broker, String username, String password, boolean useMemoryPersistence) { this.broker = broker; this.username = username; this.password = password; + this.useMemoryPersistence = useMemoryPersistence; } @Override public MqttClient create() throws Exception { String clientId = MqttClient.generateClientId(); - MqttClient client = new MqttClient(broker, clientId); - + MqttClient client = null ; + // 浣跨敤鍐呭瓨鎸佷箙鍖栬�岄潪榛樿鐨勬枃浠舵寔涔呭寲 + if (useMemoryPersistence) { + MemoryPersistence persistence = new MemoryPersistence(); + client = new MqttClient(broker, clientId, persistence); + }else{ + client = new MqttClient(broker, clientId); + } MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java index 2611e24..560067f 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java @@ -34,7 +34,7 @@ public static void main(String[] args) { try{ // 鍒濆鍖栬繛鎺ユ睜 - pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections); + pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections, true); MqttClient clientSub = pool.popClient() ; testSubscribe(clientSub, topic1); testSubscribe(clientSub, topic2); diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java index bc144f2..8226de7 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java @@ -15,8 +15,8 @@ public String deviceId ;//璁惧ID public String protocol;//鍗忚 - public String topic ;//娑堟伅涓婚 - public String msg ;//娑堟伅 + public MqttTopic topic ;//娑堟伅涓婚 + public String metaData;//MQTT鎺ㄩ�佹潵鐨勫厓鏁版嵁 public abstract boolean valid(); diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java index 8403a73..67b1245 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java @@ -18,4 +18,16 @@ public String devId ;//璁惧锛團Box锛塈D public String topic ;//娑堟伅涓婚 + public boolean isEmpty(){ + return orgTag == null || protocol == null || devId == null || topic == null + || orgTag.trim().length() == 0 || protocol.trim().length() == 0 || devId.trim().length() == 0 || topic.trim().length() == 0 ; + } + + public String shortName(){ + return topic ; + } + + public String longName(){ + return orgTag + "/" + protocol + "/" + devId + "/" + topic ; + } } diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java index 001b8c3..9615da3 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java @@ -24,8 +24,8 @@ public MqttSubMsgSdV1(MqttTopic subTopic, String msg) { this.deviceId = subTopic.devId ; this.protocol = subTopic.protocol ; - this.topic = subTopic.topic ; - this.msg = msg ; + this.topic = subTopic ; + this.metaData = msg ; } public String toString(){ StringBuilder sb = new StringBuilder(); @@ -35,17 +35,17 @@ .append("\n") ; } sb.append("涓婚:") - .append(topic) - .append("\n") ; - sb.append("娑堟伅:") - .append(msg) + .append(topic.longName()) .append("\n") ; if(vo4Up != null){ sb.append("鏁版嵁:") .append(vo4Up.toString()) .append("\n") ; + }else{ + sb.append("鍏冩暟鎹�:") + .append(metaData) + .append("\n") ; } - return sb.toString() ; } @@ -64,7 +64,7 @@ if (topic == null || topic.isEmpty()) { return false; } - if (msg == null || msg.isEmpty()) { + if (metaData == null || metaData.isEmpty()) { return false; } return true; diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java index df625b1..527e6bc 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java @@ -12,6 +12,7 @@ import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.InjectStartVo; import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.StirStartVo; import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.*; +import com.dy.common.mw.protocol4Mqtt.status.DevRunSt; import org.eclipse.paho.client.mqttv3.MqttMessage; /** @@ -21,38 +22,45 @@ */ public class ProtocolParserSdV1 { public MqttSubMsgSdV1 parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception { - String msg = new String(mqttMsg.getPayload(), "UTF-8"); - if(JSON.isValid(msg)){ - throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.topic + "娑堟伅鏍煎紡闈瀓son鏁版嵁(" + msg + ")") ; + String strTxt = new String(mqttMsg.getPayload(), "UTF-8"); + if(!JSON.isValid(strTxt)){ + throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.longName() + "娑堟伅鏍煎紡闈瀓son鏁版嵁(" + strTxt + ")") ; } - MqttSubMsgSdV1 ms = new MqttSubMsgSdV1(subTopic, msg); + MqttSubMsgSdV1 msg = new MqttSubMsgSdV1(subTopic, strTxt); Vo4Up vo ; + DevRunSt stVo ; switch (subTopic.topic) { case ProtocolConstantSdV1.SubTopicWeather -> { - vo = JSON.parseObject(msg, WeatherVo.class); + vo = JSON.parseObject(strTxt, WeatherVo.class); break; } case ProtocolConstantSdV1.SubTopicSoil -> { - vo = JSON.parseObject(msg, SoilVo.class); + vo = JSON.parseObject(strTxt, SoilVo.class); break; } case ProtocolConstantSdV1.SubTopicManure -> { - vo = JSON.parseObject(msg, ManureVo.class); + vo = JSON.parseObject(strTxt, ManureVo.class); break; } case ProtocolConstantSdV1.SubTopicState -> { //姝ゅ鏈畬鎴愶紝搴旇浜х敓涓�浜涢�氫俊鐨刬nfo锛屼緵涓嬮潰callback.notify(objs)閫氱煡鍑哄幓 - vo = JSON.parseObject(msg, StateVo.class); + vo = JSON.parseObject(strTxt, StateVo.class); + stVo = new DevRunSt() ; + stVo.id = msg.deviceId ; + //stVo.stirRunning = true ; //鎼呮媽杩愯 true鏄� false鍚� + //stVo.injectRunning = true ; //娉ㄨ偉杩愯 true鏄� false鍚� + //stVo.irrRunning = true ; //鐏屾簤杩愯 true鏄� false鍚� + //stVo.alarm = true ; //鎶ヨ true鏄� false鍚� break; } default -> { throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.topic + "娑堟伅瑙f瀽閫昏緫鏈疄鐜�"); } } - ms.vo4Up = vo ; - callback.callback(ms); + msg.vo4Up = vo ; + callback.callback(msg); callback.notify(null);//姝ゅ鏈畬鎴� - return ms; + return msg; } public MqttPubMsgSdV1 createPubMsg(String orgTag, Command com) throws Exception { diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java index 6ebc8be..62ba401 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java @@ -43,7 +43,7 @@ public String devDtStr ;//璁惧鏃堕棿 public String getDevDtStr() { - if(devDt == null){ + if(devDt != null){ return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ; }else{ return "" ; @@ -53,17 +53,17 @@ @Override public String toString(){ StringBuilder sb = new StringBuilder(); - sb.append("姘旇薄鏁版嵁锛�") ; - sb.append(" 娑堟伅ID锛�"+messageId) ; - sb.append(" 浜屾哀鍖栫⒊锛�"+carbonDioxide) ; - sb.append(" 鍏夌収寮哄害锛�"+lightIntensity) ; - sb.append(" 澶ф皵鍘嬪姏锛�"+atmosphericPressure) ; - sb.append(" 绌烘皵娓╁害锛�"+airTemperature) ; - sb.append(" 绌烘皵婀垮害锛�"+airHumidity) ; - sb.append(" PM2.5锛�"+pm25) ; - sb.append(" PM10锛�"+pm10) ; - sb.append(" 璁惧鏃堕棿锛�"+devDt) ; - sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ; + sb.append("姘旇薄鏁版嵁=>") ; + sb.append(" 娑堟伅ID锛�"+messageId + ", ") ; + sb.append(" 浜屾哀鍖栫⒊锛�"+carbonDioxide + ", ") ; + sb.append(" 鍏夌収寮哄害锛�"+lightIntensity + ", ") ; + sb.append(" 澶ф皵鍘嬪姏锛�"+atmosphericPressure + ", ") ; + sb.append(" 绌烘皵娓╁害锛�"+airTemperature + ", ") ; + sb.append(" 绌烘皵婀垮害锛�"+airHumidity + ", ") ; + sb.append(" PM2.5锛�"+pm25 + ", ") ; + sb.append(" PM10锛�"+pm10 + ", ") ; + sb.append(" 璁惧鏃堕棿锛�"+devDt + ", ") ; + sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr() + ", ") ; sb.append("\n") ; return sb.toString() ; } diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java index 0cf1c24..e823bbd 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java @@ -474,6 +474,7 @@ if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){ throw new Exception("config.mqtt.poolMaxSize閰嶇疆鐨勮繛鎺ユ睜杩炴帴鏈�澶ф暟閲忎笉鍚堟硶") ; } + mqVo.useMemoryPersistence = conf.getSetAttrBoolean(doc, "config.mqtt", "useMemoryPersistence", null, null) ; String proAndDevIds = conf.getSetAttrTxt(doc, "config.mqtt", "protocolAndDeviceIds", null, false, null) ; if(proAndDevIds == null || proAndDevIds.trim().equals("")){ throw new Exception("config.mqtt.protocolAndDeviceIds閰嶇疆涓嶅悎娉�") ; diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java index 5171b77..95a6248 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java @@ -32,7 +32,7 @@ com2.code = CodeLocal.stopMqttSv ; com2.type = CommandType.innerCommand ; new CommandInnerDeaLer().deal(com2) ; - + log.info("鍏抽棴绋嬪簭鍓嶏紝鍏抽棴浜哅QTT鏈嶅姟"); }catch (Exception e){ log.error("绋嬪簭锛堟帶鍒跺彴锛夊叧闂挬瀛愬彂鐢熷紓甯�", e); } diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java index 6b4d081..0e2375b 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java @@ -113,6 +113,10 @@ rCom = this.stopMqttSv(com); break; } + case CodeLocal.recoverMqttSv -> { + rCom = this.recoverMqttSv(com); + break; + } default -> { rCom = ReturnCommand.errored("鍑洪敊锛屾敹鍒板唴閮ㄥ懡浠ょ殑鍔熻兘鐮佷笉鑳借瘑鍒紒", com.getId(), com.getCode()); break; diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java index 3a80412..676852f 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java @@ -19,7 +19,6 @@ * @param message */ public static Command successed(String message, String commandId, String code, Object attachment) { - log.info(message); Command command = new Command().createReturnSuccessCommand(message, commandId, code); command.setAttachment(attachment); return command; @@ -29,7 +28,6 @@ * @param message */ public static Command successed(String message, String commandId, String code) { - log.info(message); return new Command().createReturnSuccessCommand(message, commandId, code); } /** @@ -37,7 +35,6 @@ * @param message */ public static Command errored(String message, String commandId, String code) { - log.error(message); return new Command().createReturnErrorCommand(message, commandId, code); } } diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java index e1a0417..56096ba 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java @@ -38,7 +38,7 @@ public static final String onPartLineMqtt = "LMCD0002" ;//鏌ヨ閮ㄥ垎MQTT璁惧鍦ㄧ嚎鎯呭喌 - public static final String onLineStatisticsMqtt = "LMCD0003" ;//鏌ヨ鎵�鏈塎QTT璁惧鐘舵�佺粺璁℃儏鍐� + public static final String onLineStatisticsMqtt = "LMCD0003" ;//鏌ヨ鎵�鏈塎QTT璁惧鍦ㄧ嚎鐘舵�佺粺璁℃儏鍐� public static final String allRtuStatesMqtt = "LMCD0010" ;//鏌ヨ鎵�鏈塎QTT璁惧鐘舵�� @@ -48,5 +48,7 @@ public static final String stopMqttSv = "LMCD0110" ;//鍋滄Mqtt鏈嶅姟 + public static final String recoverMqttSv = "LMCD0112" ;//閲嶅惎MQTT鏈嶅姟锛屾帴鍏ユ柊鐨凪QTT杩炴帴 + } diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java index c164c0a..099cc9e 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java @@ -55,7 +55,7 @@ Map.Entry<String, DevStatus> entry = null ; while(it.hasNext()){ entry = it.next() ; - if(((DevStatus)entry).onLine != null && ((DevStatus)entry).onLine.booleanValue()){ + if((entry.getValue()).onLine != null && (entry.getValue()).onLine.booleanValue()){ vo.onLineNum++ ; }else{ vo.offLineNum++ ; diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java index ac3e211..661696d 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java @@ -10,6 +10,9 @@ import org.apache.logging.log4j.Logger; import org.eclipse.paho.client.mqttv3.MqttClient; +import java.util.ArrayList; +import java.util.List; + /** * @Author: liurunyu * @Date: 2025/6/4 14:54 @@ -24,6 +27,8 @@ private MqttUnitConfigVo configVo ; private MqttClientPool pool; + + private List<MqttClient> subClients ; private MqttManager(){ } @@ -43,8 +48,9 @@ * @throws Exception */ public void start()throws Exception{ + subClients = new ArrayList<>(); String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort; - this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize); + this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize, this.configVo.useMemoryPersistence); if(this.pool.isClose()){ throw new Exception("Mqtt杩炴帴姹犲垵濮嬪寲澶辫触"); } @@ -57,6 +63,7 @@ if(clientSub == null || !clientSub.isConnected()){ throw new Exception("Mqtt杩炴帴姹犺幏寰楄闃呰繛鎺ヤ笉鍙敤"); } + subClients.add(clientSub) ; // 璁㈤槄涓婚 for(int i = 0; i < this.configVo.subTopics.length; i++){ for(int j = 0 ; j < this.configVo.protocolAndDeviceIds.length; j++){ @@ -90,6 +97,18 @@ } public void stop()throws Exception{ + if(subClients != null && subClients.size() > 0){ + for (MqttClient client : subClients) { + if(client != null && client.isConnected()){ + try{ + client.disconnect(); + client.close(); + }catch (Exception e){ + e.printStackTrace(); + } + } + } + } if(this.pool != null){ // 鍏抽棴杩炴帴姹� this.pool.close(); diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java index 7d8c6ea..0414628 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java @@ -22,22 +22,27 @@ @Override public void messageArrived(String topic, MqttMessage msg) throws Exception { - MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic) ; - MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback(){ - @Override - public void callback(MqttSubMsg subMsg) { - DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); - DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); - RtuLogDealer.log4Mqtt(subMsg.deviceId, "璁㈤槄娑堟伅 涓婚锛�" + subMsg.topic + " 娑堟伅锛�" + subMsg.msg); - } - @Override - public void notify(String devId, MqttNotifyInfo... infos) { - if(notify != null){ - notify.notify(devId, infos) ; + try { + MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic); + MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback() { + @Override + public void callback(MqttSubMsg subMsg) { + DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); + DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); + RtuLogDealer.log4Mqtt(subMsg.deviceId, "璁㈤槄娑堟伅 涓婚锛�" + subMsg.topic.longName() + " 鍏冩暟鎹細" + subMsg.metaData); } - } - }) ; - this.nextDeal(subMsg); + + @Override + public void notify(String devId, MqttNotifyInfo... infos) { + if (notify != null) { + notify.notify(devId, infos); + } + } + }); + this.nextDeal(subMsg); + }catch(Exception e){ + log.error("澶勭悊MQTT璁㈤槄娑堟伅鍙戠敓寮傚父", e); + } } private void nextDeal(MqttSubMsg subMsg)throws Exception { subMsg.action(new Callback() { diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java index f96f189..e2cad34 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java @@ -13,6 +13,7 @@ public String svUserName ;// public String svUserPassword ;// public Integer poolMaxSize ;// + public Boolean useMemoryPersistence ; public String[] protocolAndDeviceIds ;//璁惧鍗忚涓嶪D锛團Box锛塱d public String[] deviceIds ;//璁惧锛團Box锛塱d public String[] subTopics ;//璁㈤槄鐨勪富棰� @@ -27,6 +28,7 @@ this.svUserName = "dyyjy" ; this.svUserPassword = "Dyyjy2025,;.abc!@#" ; this.poolMaxSize = 10 ; + useMemoryPersistence = true ; this.pubTopicQos = 1 ; this.noSubThenOff = 10 * 60 * 10000L ; } diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml index 3d3e466..50e5d2c 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml @@ -171,6 +171,7 @@ svUserName MQTT鏈嶅姟鍣ㄧ敤鎴峰悕 svUserPassword MQTT鏈嶅姟鍣ㄧ敤鎴峰瘑鐮� poolMaxSize 杩炴帴姹犳渶澶ц繛鎺ユ暟 + useMemoryPersistence 浣跨敤鍐呭瓨鎸佷箙鍖栬�岄潪榛樿鐨勬枃浠舵寔涔呭寲(true鏄� false鍚�) protocolAndDeviceIds 鍦ㄥ瓙绯荤粺锛坥rgTag锛変腑鎺ュ叆鐨勮澶�(FBox)鎵�鐢ㄥ崗璁強璁惧id闆嗗悎,澶氫釜鐢ㄩ�楀彿闅斿紑锛屽崗璁笌ID鐢ㄦ鏂滄潬闅斿紑锛屼緥濡傦細sd1/338220031439,sd1/338220031440 subTopicAndQos: 璁㈤槄涓婚涓嶲os锛屼富棰樺悕涓庡叾Qos鐢ㄩ�楀彿闅斿紑锛屽涓富棰樺強Qos鐢ㄥ垎鍙烽殧寮�锛屼緥濡傦細ym/topic1,1;ym/topic2,1;ym/topic3,1锛屽鏋滄湁澶氫釜OrgTag锛屼富棰樺墠缂�鐢ㄥ叾OrgTag pubTopicQos: 鍙戝竷涓婚鐨凲os锛屽彇鍊艰寖鍥达細 @@ -185,6 +186,7 @@ svUserName="dyyjy" svUserPassword="Dyyjy2025,;.abc!@#" poolMaxSize="10" + useMemoryPersistence="true" protocolAndDeviceIds="${mqtt.protocolAndDeviceIds}" subTopicAndQos="${mqtt.subTopicAndQos}" pubTopicQos="1" -- Gitblit v1.8.0