From 21c080aa9da3acd53e014e8f917b50a48cb791cb Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期三, 11 六月 2025 13:44:04 +0800
Subject: [PATCH] 进行ApiFox发送内部命令测试,MQTTX发布气象数据测试,修改测试中发现的bug,修改不完善的地方。
---
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java | 35 +++++---
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java | 3
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java | 12 +++
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java | 24 +++---
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java | 2
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java | 15 +++
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java | 2
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java | 1
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml | 2
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java | 4
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java | 4
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java | 4
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java | 2
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java | 21 +++++
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java | 2
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java | 30 ++++--
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java | 16 ++--
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java | 4 +
18 files changed, 122 insertions(+), 61 deletions(-)
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java
index 1d6d60f..23ec867 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java
@@ -13,8 +13,8 @@
private final GenericObjectPool<MqttClient> pool;
- public MqttClientPool(String broker, String username, String password, int maxConnections) {
- MqttClientPooledObjectFactory factory = new MqttClientPooledObjectFactory(broker, username, password);
+ public MqttClientPool(String broker, String username, String password, int maxConnections, boolean useMemoryPersistence) {
+ MqttClientPooledObjectFactory factory = new MqttClientPooledObjectFactory(broker, username, password, useMemoryPersistence);
GenericObjectPoolConfig<MqttClient> config = new GenericObjectPoolConfig<>();
config.setMaxTotal(maxConnections);
config.setMaxIdle(maxConnections);
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java
index bd2eb0f..86fc682 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java
@@ -5,6 +5,7 @@
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* @Author: liurunyu
@@ -16,18 +17,26 @@
private final String broker;
private final String username;
private final String password;
+ private final Boolean useMemoryPersistence;
- public MqttClientPooledObjectFactory(String broker, String username, String password) {
+ public MqttClientPooledObjectFactory(String broker, String username, String password, boolean useMemoryPersistence) {
this.broker = broker;
this.username = username;
this.password = password;
+ this.useMemoryPersistence = useMemoryPersistence;
}
@Override
public MqttClient create() throws Exception {
String clientId = MqttClient.generateClientId();
- MqttClient client = new MqttClient(broker, clientId);
-
+ MqttClient client = null ;
+ // 浣跨敤鍐呭瓨鎸佷箙鍖栬�岄潪榛樿鐨勬枃浠舵寔涔呭寲
+ if (useMemoryPersistence) {
+ MemoryPersistence persistence = new MemoryPersistence();
+ client = new MqttClient(broker, clientId, persistence);
+ }else{
+ client = new MqttClient(broker, clientId);
+ }
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java
index 2611e24..560067f 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java
@@ -34,7 +34,7 @@
public static void main(String[] args) {
try{
// 鍒濆鍖栬繛鎺ユ睜
- pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections);
+ pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections, true);
MqttClient clientSub = pool.popClient() ;
testSubscribe(clientSub, topic1);
testSubscribe(clientSub, topic2);
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
index bc144f2..8226de7 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
@@ -15,8 +15,8 @@
public String deviceId ;//璁惧ID
public String protocol;//鍗忚
- public String topic ;//娑堟伅涓婚
- public String msg ;//娑堟伅
+ public MqttTopic topic ;//娑堟伅涓婚
+ public String metaData;//MQTT鎺ㄩ�佹潵鐨勫厓鏁版嵁
public abstract boolean valid();
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
index 8403a73..67b1245 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
@@ -18,4 +18,16 @@
public String devId ;//璁惧锛團Box锛塈D
public String topic ;//娑堟伅涓婚
+ public boolean isEmpty(){
+ return orgTag == null || protocol == null || devId == null || topic == null
+ || orgTag.trim().length() == 0 || protocol.trim().length() == 0 || devId.trim().length() == 0 || topic.trim().length() == 0 ;
+ }
+
+ public String shortName(){
+ return topic ;
+ }
+
+ public String longName(){
+ return orgTag + "/" + protocol + "/" + devId + "/" + topic ;
+ }
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
index 001b8c3..9615da3 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
@@ -24,8 +24,8 @@
public MqttSubMsgSdV1(MqttTopic subTopic, String msg) {
this.deviceId = subTopic.devId ;
this.protocol = subTopic.protocol ;
- this.topic = subTopic.topic ;
- this.msg = msg ;
+ this.topic = subTopic ;
+ this.metaData = msg ;
}
public String toString(){
StringBuilder sb = new StringBuilder();
@@ -35,17 +35,17 @@
.append("\n") ;
}
sb.append("涓婚:")
- .append(topic)
- .append("\n") ;
- sb.append("娑堟伅:")
- .append(msg)
+ .append(topic.longName())
.append("\n") ;
if(vo4Up != null){
sb.append("鏁版嵁:")
.append(vo4Up.toString())
.append("\n") ;
+ }else{
+ sb.append("鍏冩暟鎹�:")
+ .append(metaData)
+ .append("\n") ;
}
-
return sb.toString() ;
}
@@ -64,7 +64,7 @@
if (topic == null || topic.isEmpty()) {
return false;
}
- if (msg == null || msg.isEmpty()) {
+ if (metaData == null || metaData.isEmpty()) {
return false;
}
return true;
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
index df625b1..527e6bc 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
@@ -12,6 +12,7 @@
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.InjectStartVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.StirStartVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.*;
+import com.dy.common.mw.protocol4Mqtt.status.DevRunSt;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
@@ -21,38 +22,45 @@
*/
public class ProtocolParserSdV1 {
public MqttSubMsgSdV1 parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception {
- String msg = new String(mqttMsg.getPayload(), "UTF-8");
- if(JSON.isValid(msg)){
- throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.topic + "娑堟伅鏍煎紡闈瀓son鏁版嵁(" + msg + ")") ;
+ String strTxt = new String(mqttMsg.getPayload(), "UTF-8");
+ if(!JSON.isValid(strTxt)){
+ throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.longName() + "娑堟伅鏍煎紡闈瀓son鏁版嵁(" + strTxt + ")") ;
}
- MqttSubMsgSdV1 ms = new MqttSubMsgSdV1(subTopic, msg);
+ MqttSubMsgSdV1 msg = new MqttSubMsgSdV1(subTopic, strTxt);
Vo4Up vo ;
+ DevRunSt stVo ;
switch (subTopic.topic) {
case ProtocolConstantSdV1.SubTopicWeather -> {
- vo = JSON.parseObject(msg, WeatherVo.class);
+ vo = JSON.parseObject(strTxt, WeatherVo.class);
break;
}
case ProtocolConstantSdV1.SubTopicSoil -> {
- vo = JSON.parseObject(msg, SoilVo.class);
+ vo = JSON.parseObject(strTxt, SoilVo.class);
break;
}
case ProtocolConstantSdV1.SubTopicManure -> {
- vo = JSON.parseObject(msg, ManureVo.class);
+ vo = JSON.parseObject(strTxt, ManureVo.class);
break;
}
case ProtocolConstantSdV1.SubTopicState -> {
//姝ゅ鏈畬鎴愶紝搴旇浜х敓涓�浜涢�氫俊鐨刬nfo锛屼緵涓嬮潰callback.notify(objs)閫氱煡鍑哄幓
- vo = JSON.parseObject(msg, StateVo.class);
+ vo = JSON.parseObject(strTxt, StateVo.class);
+ stVo = new DevRunSt() ;
+ stVo.id = msg.deviceId ;
+ //stVo.stirRunning = true ; //鎼呮媽杩愯 true鏄� false鍚�
+ //stVo.injectRunning = true ; //娉ㄨ偉杩愯 true鏄� false鍚�
+ //stVo.irrRunning = true ; //鐏屾簤杩愯 true鏄� false鍚�
+ //stVo.alarm = true ; //鎶ヨ true鏄� false鍚�
break;
}
default -> {
throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.topic + "娑堟伅瑙f瀽閫昏緫鏈疄鐜�");
}
}
- ms.vo4Up = vo ;
- callback.callback(ms);
+ msg.vo4Up = vo ;
+ callback.callback(msg);
callback.notify(null);//姝ゅ鏈畬鎴�
- return ms;
+ return msg;
}
public MqttPubMsgSdV1 createPubMsg(String orgTag, Command com) throws Exception {
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
index 6ebc8be..62ba401 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
@@ -43,7 +43,7 @@
public String devDtStr ;//璁惧鏃堕棿
public String getDevDtStr() {
- if(devDt == null){
+ if(devDt != null){
return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ;
}else{
return "" ;
@@ -53,17 +53,17 @@
@Override
public String toString(){
StringBuilder sb = new StringBuilder();
- sb.append("姘旇薄鏁版嵁锛�") ;
- sb.append(" 娑堟伅ID锛�"+messageId) ;
- sb.append(" 浜屾哀鍖栫⒊锛�"+carbonDioxide) ;
- sb.append(" 鍏夌収寮哄害锛�"+lightIntensity) ;
- sb.append(" 澶ф皵鍘嬪姏锛�"+atmosphericPressure) ;
- sb.append(" 绌烘皵娓╁害锛�"+airTemperature) ;
- sb.append(" 绌烘皵婀垮害锛�"+airHumidity) ;
- sb.append(" PM2.5锛�"+pm25) ;
- sb.append(" PM10锛�"+pm10) ;
- sb.append(" 璁惧鏃堕棿锛�"+devDt) ;
- sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ;
+ sb.append("姘旇薄鏁版嵁=>") ;
+ sb.append(" 娑堟伅ID锛�"+messageId + ", ") ;
+ sb.append(" 浜屾哀鍖栫⒊锛�"+carbonDioxide + ", ") ;
+ sb.append(" 鍏夌収寮哄害锛�"+lightIntensity + ", ") ;
+ sb.append(" 澶ф皵鍘嬪姏锛�"+atmosphericPressure + ", ") ;
+ sb.append(" 绌烘皵娓╁害锛�"+airTemperature + ", ") ;
+ sb.append(" 绌烘皵婀垮害锛�"+airHumidity + ", ") ;
+ sb.append(" PM2.5锛�"+pm25 + ", ") ;
+ sb.append(" PM10锛�"+pm10 + ", ") ;
+ sb.append(" 璁惧鏃堕棿锛�"+devDt + ", ") ;
+ sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr() + ", ") ;
sb.append("\n") ;
return sb.toString() ;
}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
index 0cf1c24..e823bbd 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
@@ -474,6 +474,7 @@
if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){
throw new Exception("config.mqtt.poolMaxSize閰嶇疆鐨勮繛鎺ユ睜杩炴帴鏈�澶ф暟閲忎笉鍚堟硶") ;
}
+ mqVo.useMemoryPersistence = conf.getSetAttrBoolean(doc, "config.mqtt", "useMemoryPersistence", null, null) ;
String proAndDevIds = conf.getSetAttrTxt(doc, "config.mqtt", "protocolAndDeviceIds", null, false, null) ;
if(proAndDevIds == null || proAndDevIds.trim().equals("")){
throw new Exception("config.mqtt.protocolAndDeviceIds閰嶇疆涓嶅悎娉�") ;
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java
index 5171b77..95a6248 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java
@@ -32,7 +32,7 @@
com2.code = CodeLocal.stopMqttSv ;
com2.type = CommandType.innerCommand ;
new CommandInnerDeaLer().deal(com2) ;
-
+ log.info("鍏抽棴绋嬪簭鍓嶏紝鍏抽棴浜哅QTT鏈嶅姟");
}catch (Exception e){
log.error("绋嬪簭锛堟帶鍒跺彴锛夊叧闂挬瀛愬彂鐢熷紓甯�", e);
}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
index 6b4d081..0e2375b 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
@@ -113,6 +113,10 @@
rCom = this.stopMqttSv(com);
break;
}
+ case CodeLocal.recoverMqttSv -> {
+ rCom = this.recoverMqttSv(com);
+ break;
+ }
default -> {
rCom = ReturnCommand.errored("鍑洪敊锛屾敹鍒板唴閮ㄥ懡浠ょ殑鍔熻兘鐮佷笉鑳借瘑鍒紒", com.getId(), com.getCode());
break;
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java
index 3a80412..676852f 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/ReturnCommand.java
@@ -19,7 +19,6 @@
* @param message
*/
public static Command successed(String message, String commandId, String code, Object attachment) {
- log.info(message);
Command command = new Command().createReturnSuccessCommand(message, commandId, code);
command.setAttachment(attachment);
return command;
@@ -29,7 +28,6 @@
* @param message
*/
public static Command successed(String message, String commandId, String code) {
- log.info(message);
return new Command().createReturnSuccessCommand(message, commandId, code);
}
/**
@@ -37,7 +35,6 @@
* @param message
*/
public static Command errored(String message, String commandId, String code) {
- log.error(message);
return new Command().createReturnErrorCommand(message, commandId, code);
}
}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
index e1a0417..56096ba 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
@@ -38,7 +38,7 @@
public static final String onPartLineMqtt = "LMCD0002" ;//鏌ヨ閮ㄥ垎MQTT璁惧鍦ㄧ嚎鎯呭喌
- public static final String onLineStatisticsMqtt = "LMCD0003" ;//鏌ヨ鎵�鏈塎QTT璁惧鐘舵�佺粺璁℃儏鍐�
+ public static final String onLineStatisticsMqtt = "LMCD0003" ;//鏌ヨ鎵�鏈塎QTT璁惧鍦ㄧ嚎鐘舵�佺粺璁℃儏鍐�
public static final String allRtuStatesMqtt = "LMCD0010" ;//鏌ヨ鎵�鏈塎QTT璁惧鐘舵��
@@ -48,5 +48,7 @@
public static final String stopMqttSv = "LMCD0110" ;//鍋滄Mqtt鏈嶅姟
+ public static final String recoverMqttSv = "LMCD0112" ;//閲嶅惎MQTT鏈嶅姟锛屾帴鍏ユ柊鐨凪QTT杩炴帴
+
}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
index c164c0a..099cc9e 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
@@ -55,7 +55,7 @@
Map.Entry<String, DevStatus> entry = null ;
while(it.hasNext()){
entry = it.next() ;
- if(((DevStatus)entry).onLine != null && ((DevStatus)entry).onLine.booleanValue()){
+ if((entry.getValue()).onLine != null && (entry.getValue()).onLine.booleanValue()){
vo.onLineNum++ ;
}else{
vo.offLineNum++ ;
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
index ac3e211..661696d 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
@@ -10,6 +10,9 @@
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttClient;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* @Author: liurunyu
* @Date: 2025/6/4 14:54
@@ -24,6 +27,8 @@
private MqttUnitConfigVo configVo ;
private MqttClientPool pool;
+
+ private List<MqttClient> subClients ;
private MqttManager(){
}
@@ -43,8 +48,9 @@
* @throws Exception
*/
public void start()throws Exception{
+ subClients = new ArrayList<>();
String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort;
- this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize);
+ this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize, this.configVo.useMemoryPersistence);
if(this.pool.isClose()){
throw new Exception("Mqtt杩炴帴姹犲垵濮嬪寲澶辫触");
}
@@ -57,6 +63,7 @@
if(clientSub == null || !clientSub.isConnected()){
throw new Exception("Mqtt杩炴帴姹犺幏寰楄闃呰繛鎺ヤ笉鍙敤");
}
+ subClients.add(clientSub) ;
// 璁㈤槄涓婚
for(int i = 0; i < this.configVo.subTopics.length; i++){
for(int j = 0 ; j < this.configVo.protocolAndDeviceIds.length; j++){
@@ -90,6 +97,18 @@
}
public void stop()throws Exception{
+ if(subClients != null && subClients.size() > 0){
+ for (MqttClient client : subClients) {
+ if(client != null && client.isConnected()){
+ try{
+ client.disconnect();
+ client.close();
+ }catch (Exception e){
+ e.printStackTrace();
+ }
+ }
+ }
+ }
if(this.pool != null){
// 鍏抽棴杩炴帴姹�
this.pool.close();
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
index 7d8c6ea..0414628 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
@@ -22,22 +22,27 @@
@Override
public void messageArrived(String topic, MqttMessage msg) throws Exception {
- MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic) ;
- MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback(){
- @Override
- public void callback(MqttSubMsg subMsg) {
- DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol);
- DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId);
- RtuLogDealer.log4Mqtt(subMsg.deviceId, "璁㈤槄娑堟伅 涓婚锛�" + subMsg.topic + " 娑堟伅锛�" + subMsg.msg);
- }
- @Override
- public void notify(String devId, MqttNotifyInfo... infos) {
- if(notify != null){
- notify.notify(devId, infos) ;
+ try {
+ MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic);
+ MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback() {
+ @Override
+ public void callback(MqttSubMsg subMsg) {
+ DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol);
+ DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId);
+ RtuLogDealer.log4Mqtt(subMsg.deviceId, "璁㈤槄娑堟伅 涓婚锛�" + subMsg.topic.longName() + " 鍏冩暟鎹細" + subMsg.metaData);
}
- }
- }) ;
- this.nextDeal(subMsg);
+
+ @Override
+ public void notify(String devId, MqttNotifyInfo... infos) {
+ if (notify != null) {
+ notify.notify(devId, infos);
+ }
+ }
+ });
+ this.nextDeal(subMsg);
+ }catch(Exception e){
+ log.error("澶勭悊MQTT璁㈤槄娑堟伅鍙戠敓寮傚父", e);
+ }
}
private void nextDeal(MqttSubMsg subMsg)throws Exception {
subMsg.action(new Callback() {
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
index f96f189..e2cad34 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
@@ -13,6 +13,7 @@
public String svUserName ;//
public String svUserPassword ;//
public Integer poolMaxSize ;//
+ public Boolean useMemoryPersistence ;
public String[] protocolAndDeviceIds ;//璁惧鍗忚涓嶪D锛團Box锛塱d
public String[] deviceIds ;//璁惧锛團Box锛塱d
public String[] subTopics ;//璁㈤槄鐨勪富棰�
@@ -27,6 +28,7 @@
this.svUserName = "dyyjy" ;
this.svUserPassword = "Dyyjy2025,;.abc!@#" ;
this.poolMaxSize = 10 ;
+ useMemoryPersistence = true ;
this.pubTopicQos = 1 ;
this.noSubThenOff = 10 * 60 * 10000L ;
}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
index 3d3e466..50e5d2c 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
@@ -171,6 +171,7 @@
svUserName MQTT鏈嶅姟鍣ㄧ敤鎴峰悕
svUserPassword MQTT鏈嶅姟鍣ㄧ敤鎴峰瘑鐮�
poolMaxSize 杩炴帴姹犳渶澶ц繛鎺ユ暟
+ useMemoryPersistence 浣跨敤鍐呭瓨鎸佷箙鍖栬�岄潪榛樿鐨勬枃浠舵寔涔呭寲(true鏄� false鍚�)
protocolAndDeviceIds 鍦ㄥ瓙绯荤粺锛坥rgTag锛変腑鎺ュ叆鐨勮澶�(FBox)鎵�鐢ㄥ崗璁強璁惧id闆嗗悎,澶氫釜鐢ㄩ�楀彿闅斿紑锛屽崗璁笌ID鐢ㄦ鏂滄潬闅斿紑锛屼緥濡傦細sd1/338220031439,sd1/338220031440
subTopicAndQos: 璁㈤槄涓婚涓嶲os锛屼富棰樺悕涓庡叾Qos鐢ㄩ�楀彿闅斿紑锛屽涓富棰樺強Qos鐢ㄥ垎鍙烽殧寮�锛屼緥濡傦細ym/topic1,1;ym/topic2,1;ym/topic3,1锛屽鏋滄湁澶氫釜OrgTag锛屼富棰樺墠缂�鐢ㄥ叾OrgTag
pubTopicQos: 鍙戝竷涓婚鐨凲os锛屽彇鍊艰寖鍥达細
@@ -185,6 +186,7 @@
svUserName="dyyjy"
svUserPassword="Dyyjy2025,;.abc!@#"
poolMaxSize="10"
+ useMemoryPersistence="true"
protocolAndDeviceIds="${mqtt.protocolAndDeviceIds}"
subTopicAndQos="${mqtt.subTopicAndQos}"
pubTopicQos="1"
--
Gitblit v1.8.0