From b2c928eac40c4b7f495f8164eeb59005219fa350 Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期三, 11 六月 2025 17:55:29 +0800
Subject: [PATCH] 1、MQTT协议,增加设备参数命令及相关数据; 2、完善上行数据值对象; 3、完善其他代码。
---
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java | 29 ----
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgDealer.java | 41 +++++
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java | 4
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java | 5
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java | 22 ++
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java | 4
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunInfo.java | 4
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java | 4
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java | 26 +++
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java | 5
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java | 10
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java | 20 +-
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java | 4
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ParamSetVo.java | 31 ++++
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineInfo.java | 2
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java | 1
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java | 43 +++++
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java | 5
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComSetParamVo.java | 23 +++
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java | 1
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java | 2
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java | 3
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java | 14 +-
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java | 66 ++++++--
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java | 8
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java | 3
26 files changed, 278 insertions(+), 102 deletions(-)
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java
index 11cfbd6..5a03f7e 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java
@@ -21,7 +21,7 @@
vo.orgTag = topicGrp[0] ;
vo.protocol = topicGrp[1] ;
vo.devId = topicGrp[2] ;
- vo.topic = topicGrp[3] ;
+ vo.name = topicGrp[3] ;
return vo ;
}
}else{
@@ -30,7 +30,7 @@
}
public static String createPubTopic(MqttTopic tp) throws Exception {
- return tp.orgTag + "/" + tp.protocol + "/" + tp.devId + "/" + tp.topic ;
+ return tp.orgTag + "/" + tp.protocol + "/" + tp.devId + "/" + tp.name;
}
public static MqttSubMsg parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception {
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java
index bd864d3..eba4ecf 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java
@@ -12,7 +12,7 @@
public String mqttResultSendWebUrl ;//Mqtt杩斿洖鍛戒护缁撴灉 鍙戝悜鐩殑鍦皐eb URL
- public String topic ;//娑堟伅涓婚
+ public MqttTopic topic ;//娑堟伅涓婚
public String msg ;//娑堟伅
public boolean isCacheForOffLine ;//涓嬭鍛戒护鎺у埗锛屾秷鎭腑闂翠欢涓嶅湪绾挎槸鍚︾紦瀛樺懡浠�
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
index 67b1245..d088e08 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
@@ -16,18 +16,18 @@
public String orgTag ;//缁勭粐鏍囪瘑
public String protocol ;//鍗忚鍚嶇О
public String devId ;//璁惧锛團Box锛塈D
- public String topic ;//娑堟伅涓婚
+ public String name;//娑堟伅涓婚鏈鍚嶇О
public boolean isEmpty(){
- return orgTag == null || protocol == null || devId == null || topic == null
- || orgTag.trim().length() == 0 || protocol.trim().length() == 0 || devId.trim().length() == 0 || topic.trim().length() == 0 ;
+ return orgTag == null || protocol == null || devId == null || name == null
+ || orgTag.trim().length() == 0 || protocol.trim().length() == 0 || devId.trim().length() == 0 || name.trim().length() == 0 ;
}
public String shortName(){
- return topic ;
+ return name;
}
public String longName(){
- return orgTag + "/" + protocol + "/" + devId + "/" + topic ;
+ return orgTag + "/" + protocol + "/" + devId + "/" + name;
}
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java
index 4d0cd99..a78f8b0 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java
@@ -10,4 +10,5 @@
public static final String cd_Stir = "01" ;//鎼呮媽鍚仠鍛戒护
public static final String cd_Inject = "02" ;//娉ㄨ偉鍚仠鍛戒护
public static final String cd_Irr = "03" ;//鐏屾簤鍚仠鍛戒护
+ public static final String cd_Param = "10" ;//璁惧畾鍙傛暟
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
index 9615da3..c654c22 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
@@ -51,8 +51,9 @@
public boolean subMsgMatchPubMsg(MqttPubMsg pubMsg){
if (pubMsg instanceof MqttPubMsgSdV1) {
- MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg;
+ //MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg;
if(this.vo4Up != null && this.vo4Up instanceof StateVo){
+ //鍙涓婃姤鐨勬槸鐘舵�佹暟鎹紝璇存槑璁惧鍝嶅簲浜嗗懡浠�
return true ;
}
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java
index 60dc354..8961281 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java
@@ -20,5 +20,6 @@
public static final String PubTopicStir = "ctrlStir" ;//鎼呮媽鍚仠
public static final String PubTopicInject = "ctrlInject" ;//娉ㄨ偉鍚仠
public static final String PubTopicIrr = "ctrlIrr" ;//鐏屾簤鍚仠
+ public static final String PubTopicParam = "setParam" ;//璁剧疆鍙傛暟
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
index 527e6bc..1a7d373 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
@@ -4,15 +4,16 @@
import com.alibaba.fastjson2.JSONObject;
import com.dy.common.mw.protocol.Command;
import com.dy.common.mw.protocol4Mqtt.MqttCallback;
-import com.dy.common.mw.protocol4Mqtt.MqttMsgParser;
import com.dy.common.mw.protocol4Mqtt.MqttTopic;
import com.dy.common.mw.protocol4Mqtt.Vo4Up;
import com.dy.common.mw.protocol4Mqtt.pSdV1.comParam.ComCtrlVo;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.comParam.ComSetParamVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.FaultClearVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.InjectStartVo;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.ParamSetVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.StirStartVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.*;
-import com.dy.common.mw.protocol4Mqtt.status.DevRunSt;
+import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
@@ -28,8 +29,8 @@
}
MqttSubMsgSdV1 msg = new MqttSubMsgSdV1(subTopic, strTxt);
Vo4Up vo ;
- DevRunSt stVo ;
- switch (subTopic.topic) {
+ DevRunInfo stInfo = null ;
+ switch (subTopic.name) {
case ProtocolConstantSdV1.SubTopicWeather -> {
vo = JSON.parseObject(strTxt, WeatherVo.class);
break;
@@ -45,21 +46,22 @@
case ProtocolConstantSdV1.SubTopicState -> {
//姝ゅ鏈畬鎴愶紝搴旇浜х敓涓�浜涢�氫俊鐨刬nfo锛屼緵涓嬮潰callback.notify(objs)閫氱煡鍑哄幓
vo = JSON.parseObject(strTxt, StateVo.class);
- stVo = new DevRunSt() ;
- stVo.id = msg.deviceId ;
- //stVo.stirRunning = true ; //鎼呮媽杩愯 true鏄� false鍚�
- //stVo.injectRunning = true ; //娉ㄨ偉杩愯 true鏄� false鍚�
- //stVo.irrRunning = true ; //鐏屾簤杩愯 true鏄� false鍚�
- //stVo.alarm = true ; //鎶ヨ true鏄� false鍚�
+ StateVo stVo = (StateVo)vo ;
+ stInfo = new DevRunInfo() ;
+ stInfo.devId = msg.deviceId ;
+ stInfo.stirRunning = (stVo.stirRunning==null?false:(stVo.stirRunning.byteValue()==1?true:false)) ; //鎼呮媽杩愯 true鏄� false鍚�
+ stInfo.injectRunning = (stVo.injectRunning==null?false:(stVo.injectRunning.byteValue()==1?true:false)) ; //娉ㄨ偉杩愯 true鏄� false鍚�
+ stInfo.irrRunning = (stVo.irrRunning==null?false:(stVo.irrRunning.byteValue()==1?true:false)) ; //鐏屾簤杩愯 true鏄� false鍚�
+ stInfo.alarm = (stVo.alarm==null?false:(stVo.alarm.byteValue()==1?true:false)) ; //鎶ヨ true鏄� false鍚�
break;
}
default -> {
- throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.topic + "娑堟伅瑙f瀽閫昏緫鏈疄鐜�");
+ throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.name + "娑堟伅瑙f瀽閫昏緫鏈疄鐜�");
}
}
msg.vo4Up = vo ;
callback.callback(msg);
- callback.notify(null);//姝ゅ鏈畬鎴�
+ callback.notify(msg.deviceId, stInfo);
return msg;
}
@@ -94,6 +96,13 @@
msg = this.createPubMsgOfIrr(orgTag, com);
break;
}
+ case CodeSdV1.cd_Param -> {
+ //璁剧疆鍙傛暟
+ this.checkParam(com);
+ this.checkRtnWebUrl(com);
+ msg = this.createPubMsgOfParam(orgTag, com);
+ break;
+ }
default -> {
throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鐗堟湰" + com.protocolVersion + "鍔熻兘鐮�" + com.code + "鏋勯�犲櫒鏈疄鐜�");
}
@@ -122,8 +131,8 @@
msg.isCacheForOffLine = false ;
msg.hasResponse = true ;
msg.cd = CodeSdV1.cd_Fault ;
- msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
- msg.msg = JSON.toJSONString(new FaultClearVo(cvo.isDo)) ;
+ msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault) ;
+ msg.msg = JSON.toJSONString(new FaultClearVo(cvo.startTrueStopFalse ?(byte)1:0)) ;
return msg ;
}
private MqttPubMsgSdV1 createPubMsgOfStir(String orgTag, Command com) throws Exception {
@@ -138,8 +147,8 @@
msg.isCacheForOffLine = false ;
msg.hasResponse = true ;
msg.cd = CodeSdV1.cd_Fault ;
- msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
- msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ;
+ msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicStir) ;
+ msg.msg = JSON.toJSONString(new StirStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ;
return msg ;
}
private MqttPubMsgSdV1 createPubMsgOfInject(String orgTag, Command com) throws Exception {
@@ -154,8 +163,8 @@
msg.isCacheForOffLine = false ;
msg.hasResponse = true ;
msg.cd = CodeSdV1.cd_Fault ;
- msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
- msg.msg = JSON.toJSONString(new InjectStartVo(cvo.isDo)) ;
+ msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicInject) ;
+ msg.msg = JSON.toJSONString(new InjectStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ;
return msg ;
}
private MqttPubMsgSdV1 createPubMsgOfIrr(String orgTag, Command com) throws Exception {
@@ -170,8 +179,25 @@
msg.isCacheForOffLine = false ;
msg.hasResponse = true ;
msg.cd = CodeSdV1.cd_Fault ;
- msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
- msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ;
+ msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicIrr) ;
+ msg.msg = JSON.toJSONString(new StirStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ;
+ return msg ;
+ }
+
+ private MqttPubMsgSdV1 createPubMsgOfParam(String orgTag, Command com) throws Exception {
+ JSONObject obj = (JSONObject) com.param;
+ String json = obj.toJSONString();
+ ComSetParamVo cvo = JSON.parseObject(json, ComSetParamVo.class);
+ if(cvo == null){
+ throw new Exception("json杞珻omSetParamVo涓簄ull") ;
+ }
+ MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ;
+ this.setPubMsgBase(com, msg);
+ msg.isCacheForOffLine = false ;
+ msg.hasResponse = false ;
+ msg.cd = CodeSdV1.cd_Param ;
+ msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicParam) ;
+ msg.msg = JSON.toJSONString(new ParamSetVo(cvo.stirDuration, cvo.injectDuration)) ;
return msg ;
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java
index 185e4f5..6e6e9f6 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java
@@ -6,7 +6,7 @@
* @Description
*/
public class ComCtrlVo {
- //鏄惁鎺у埗鍔ㄤ綔锛宼rue鏄紝false鍚�
+ //鍚仠鍔ㄤ綔锛宼rue鏄紝false鍚�
//鍙互鎵ц鍔熻兘鐮� 00锛�01锛�02锛�03鐨勫姩浣�
- public boolean isDo;//
+ public boolean startTrueStopFalse;//
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComSetParamVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComSetParamVo.java
new file mode 100644
index 0000000..a14d5b1
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComSetParamVo.java
@@ -0,0 +1,23 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.comParam;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/11 17:03
+ * @Description
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class ComSetParamVo {
+ // 鎼呮媽璁惧畾鏃堕棿
+ public Integer stirDuration ;
+
+ // 娉ㄨ偉璁惧畾鏃堕棿
+ public Integer injectDuration ;
+
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java
index e9657ce..ec31c5b 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java
@@ -16,10 +16,10 @@
@AllArgsConstructor
public class FaultClearVo implements Vo4Down {
@JSONField(name = "鏁呴殰瑙i櫎")
- public boolean isDo ;
+ public Byte isDo ;//1鏄紝0鍚�
@Override
public String toString(){
- return "鏁呴殰瑙i櫎锛�" + (isDo?"鏄�":"鍚�") ;
+ return "鏁呴殰瑙i櫎锛�" + (isDo==1?"鏄�":"鍚�") ;
}
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java
index 4311fd2..5e4190e 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java
@@ -16,9 +16,10 @@
@AllArgsConstructor
public class InjectStartVo implements Vo4Down {
@JSONField(name = "娉ㄨ偉鍚仠")
- public boolean isDo ;//true涓哄惎锛宖alse涓哄仠
+ public Byte isDo ;//1鏄紝0鍚�
+
@Override
public String toString(){
- return "娉ㄨ偉鍚仠锛�" + (isDo?"鍚�":"鍋�") ;
+ return "娉ㄨ偉鍚仠锛�" + (isDo==1?"鍚�":"鍋�") ;
}
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java
index f05c154..69feb68 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java
@@ -16,9 +16,10 @@
@AllArgsConstructor
public class IrrStartVo implements Vo4Down {
@JSONField(name = "鐏屾簤鍚仠")
- public boolean isDo ;//true涓哄惎锛宖alse涓哄仠
+ public Byte isDo ;//1鏄紝0鍚�
+
@Override
public String toString(){
- return "鐏屾簤鍚仠锛�" + (isDo?"鍚�":"鍋�") ;
+ return "鐏屾簤鍚仠锛�" + (isDo==1?"鏄�":"鍚�") ;
}
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ParamSetVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ParamSetVo.java
new file mode 100644
index 0000000..48ecd14
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ParamSetVo.java
@@ -0,0 +1,31 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Down;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/11 16:55
+ * @Description
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class ParamSetVo implements Vo4Down {
+ @JSONField(name = "鎼呮媽璁惧畾鏃堕棿")
+ public Integer stirDuration ;
+
+ @JSONField(name = "娉ㄨ偉璁惧畾鏃堕棿")
+ public Integer injectDuration ;
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("鎼呮媽璁惧畾鏃堕棿锛�" + stirDuration + "\n" );
+ sb.append("娉ㄨ偉璁惧畾鏃堕棿锛�" + injectDuration + "\n" );
+ return sb.toString();
+ }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java
index 44baf99..333207e 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java
@@ -16,9 +16,10 @@
@AllArgsConstructor
public class StirStartVo implements Vo4Down {
@JSONField(name = "鎼呮媽鍚仠")
- public boolean isDo ;//true涓哄惎锛宖alse涓哄仠
+ public Byte isDo ;//1鏄紝0鍚�
+
@Override
public String toString(){
- return "鎼呮媽鍚仠锛�" + (isDo?"鍚�":"鍋�") ;
+ return "鎼呮媽鍚仠锛�" + (isDo==1?"鏄�":"鍚�") ;
}
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java
index f038514..a2f9ae8 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java
@@ -12,6 +12,18 @@
*/
@Data
public class ManureVo implements Vo4Up {
+ @JSONField(name = "flexem_message_id")
+ public Integer messageId ;//娑堟伅ID
+
+ @JSONField(name = "鑲ユ枡娴侀噺")
+ public Float manureFlow ;//鑲ユ枡娴侀噺
+
+ @JSONField(name = "娉ㄨ偉鏃堕暱")
+ public Integer manureTime ;//娉ㄨ偉鏃堕暱
+
+ @JSONField(name = "鎼呮媽鏃堕暱")
+ public Integer stirTime ;//鎼呮媽鏃堕暱
+
@JSONField(name = "flexem_timestamp")
public Long devDt ;//璁惧鏃堕棿
@@ -27,9 +39,13 @@
@Override
public String toString(){
StringBuilder sb = new StringBuilder();
- sb.append("姘磋偉鏁版嵁锛�") ;
- sb.append(" 璁惧鏃堕棿锛�"+devDt) ;
- sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ;
+ sb.append("姘磋偉鏁版嵁=>") ;
+ sb.append(" 娑堟伅ID锛�" + messageId + ", ") ;
+ sb.append(" 鑲ユ枡娴侀噺锛�" + manureFlow + ", ") ;
+ sb.append(" 娉ㄨ偉鏃堕暱锛�" + manureTime + ", ") ;
+ sb.append(" 鎼呮媽鏃堕暱锛�" + stirTime + ", ") ;
+ sb.append(" 璁惧鏃堕棿锛�" + devDt + ", ") ;
+ sb.append(" 璁惧鏃堕棿锛�" + this.getDevDtStr() + ", ") ;
sb.append("\n") ;
return sb.toString() ;
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java
index a465609..f5bb1e0 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java
@@ -12,6 +12,33 @@
*/
@Data
public class SoilVo implements Vo4Up {
+ @JSONField(name = "flexem_message_id")
+ public Integer messageId ;//娑堟伅ID
+
+ @JSONField(name = "鍦熷¥婀垮害1")
+ public Float soilHumidity1 ;//鍦熷¥婀垮害1
+
+ @JSONField(name = "鍦熷¥婀垮害2")
+ public Float soilHumidity2 ;//鍦熷¥婀垮害2
+
+ @JSONField(name = "鍦熷¥婀垮害3")
+ public Float soilHumidity3 ;//鍦熷¥婀垮害3
+
+ @JSONField(name = "鍦熷¥婀垮害4")
+ public Float soilHumidity4 ;//鍦熷¥婀垮害4
+
+ @JSONField(name = "鍦熷¥婀垮害1")
+ public Float soilTemperature1 ;//鍦熷¥娓╁害1
+
+ @JSONField(name = "鍦熷¥娓╁害2")
+ public Float soilTemperature2 ;//鍦熷¥娓╁害2
+
+ @JSONField(name = "鍦熷¥娓╁害3")
+ public Float soilTemperature3 ;//鍦熷¥娓╁害3
+
+ @JSONField(name = "鍦熷¥娓╁害4")
+ public Float soilTemperature4 ;//鍦熷¥娓╁害4
+
@JSONField(name = "flexem_timestamp")
public Long devDt ;//璁惧鏃堕棿
@@ -27,10 +54,20 @@
@Override
public String toString(){
StringBuilder sb = new StringBuilder();
- sb.append("澧掓儏鏁版嵁锛�") ;
- sb.append(" 璁惧鏃堕棿锛�"+devDt) ;
- sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ;
+ sb.append("澧掓儏鏁版嵁=>") ;
+ sb.append(" 娑堟伅ID锛�" + messageId + ", ") ;
+ sb.append(" 鍦熷¥婀垮害1锛�" + soilHumidity1 + ", ") ;
+ sb.append(" 鍦熷¥婀垮害2锛�" + soilHumidity2 + ", ") ;
+ sb.append(" 鍦熷¥婀垮害3锛�" + soilHumidity3 + ", ") ;
+ sb.append(" 鍦熷¥婀垮害4锛�" + soilHumidity4 + ", ") ;
+ sb.append(" 鍦熷¥娓╁害1锛�" + soilTemperature1 + ", ") ;
+ sb.append(" 鍦熷¥娓╁害2锛�" + soilTemperature2 + ", ") ;
+ sb.append(" 鍦熷¥娓╁害3锛�" + soilTemperature3 + ", ") ;
+ sb.append(" 鍦熷¥娓╁害4锛�" + soilTemperature4 + ", ") ;
+ sb.append(" 璁惧鏃堕棿锛�" + devDt + ", ") ;
+ sb.append(" 璁惧鏃堕棿锛�" + this.getDevDtStr() + ", ") ;
sb.append("\n") ;
return sb.toString() ;
+
}
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java
index cae3afc..fb3885d 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java
@@ -12,6 +12,21 @@
*/
@Data
public class StateVo implements Vo4Up {
+ @JSONField(name = "flexem_message_id")
+ public Integer messageId ;//娑堟伅ID
+
+ @JSONField(name = "鎼呮媽杩愯")
+ public Byte stirRunning ;//鎼呮媽杩愯
+
+ @JSONField(name = "娉ㄨ偉杩愯")
+ public Byte injectRunning ;//娉ㄨ偉杩愯
+
+ @JSONField(name = "鐏屾簤杩愯")
+ public Byte irrRunning ;//鐏屾簤杩愯
+
+ @JSONField(name = "鎶ヨ")
+ public Byte alarm ;//鎶ヨ
+
@JSONField(name = "flexem_timestamp")
public Long devDt ;//璁惧鏃堕棿
@@ -27,9 +42,14 @@
@Override
public String toString(){
StringBuilder sb = new StringBuilder();
- sb.append("鐘舵�佹暟鎹細") ;
- sb.append(" 璁惧鏃堕棿锛�"+devDt) ;
- sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ;
+ sb.append("鐘舵�佹暟鎹�=>") ;
+ sb.append(" 娑堟伅ID锛�" + messageId + ", ") ;
+ sb.append(" 鎼呮媽杩愯锛�" + stirRunning + ", ") ;
+ sb.append(" 娉ㄨ偉杩愯锛�" + injectRunning + ", ") ;
+ sb.append(" 鐏屾簤杩愯锛�" + irrRunning + ", ") ;
+ sb.append(" 鎶ヨ锛�" + alarm + ", ") ;
+ sb.append(" 璁惧鏃堕棿锛�" + devDt + ", ") ;
+ sb.append(" 璁惧鏃堕棿锛�" + this.getDevDtStr() + ", ") ;
sb.append("\n") ;
return sb.toString() ;
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
index 62ba401..170bf51 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
@@ -54,16 +54,16 @@
public String toString(){
StringBuilder sb = new StringBuilder();
sb.append("姘旇薄鏁版嵁=>") ;
- sb.append(" 娑堟伅ID锛�"+messageId + ", ") ;
- sb.append(" 浜屾哀鍖栫⒊锛�"+carbonDioxide + ", ") ;
- sb.append(" 鍏夌収寮哄害锛�"+lightIntensity + ", ") ;
- sb.append(" 澶ф皵鍘嬪姏锛�"+atmosphericPressure + ", ") ;
- sb.append(" 绌烘皵娓╁害锛�"+airTemperature + ", ") ;
- sb.append(" 绌烘皵婀垮害锛�"+airHumidity + ", ") ;
- sb.append(" PM2.5锛�"+pm25 + ", ") ;
- sb.append(" PM10锛�"+pm10 + ", ") ;
- sb.append(" 璁惧鏃堕棿锛�"+devDt + ", ") ;
- sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr() + ", ") ;
+ sb.append(" 娑堟伅ID锛�" + messageId + ", ") ;
+ sb.append(" 浜屾哀鍖栫⒊锛�" + carbonDioxide + ", ") ;
+ sb.append(" 鍏夌収寮哄害锛�" + lightIntensity + ", ") ;
+ sb.append(" 澶ф皵鍘嬪姏锛�" + atmosphericPressure + ", ") ;
+ sb.append(" 绌烘皵娓╁害锛�" + airTemperature + ", ") ;
+ sb.append(" 绌烘皵婀垮害锛�" + airHumidity + ", ") ;
+ sb.append(" PM2.5锛�" + pm25 + ", ") ;
+ sb.append(" PM10锛�" + pm10 + ", ") ;
+ sb.append(" 璁惧鏃堕棿锛�" + devDt + ", ") ;
+ sb.append(" 璁惧鏃堕棿锛�" + this.getDevDtStr() + ", ") ;
sb.append("\n") ;
return sb.toString() ;
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineInfo.java
similarity index 83%
rename from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java
rename to pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineInfo.java
index 1092b27..99eda6c 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineInfo.java
@@ -9,7 +9,7 @@
* @Description
*/
@Data
-public class DevOnLineSt implements MqttNotifyInfo {
+public class DevOnLineInfo implements MqttNotifyInfo {
public String id ;
public String protocol ;
public Boolean onLine ;
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunInfo.java
similarity index 82%
rename from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java
rename to pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunInfo.java
index 0fe300a..4811f1c 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunInfo.java
@@ -9,8 +9,8 @@
* @Description
*/
@Data
-public class DevRunSt implements MqttNotifyInfo {
- public String id ;
+public class DevRunInfo implements MqttNotifyInfo {
+ public String devId ;//MQTT璁剧疆ID
public Boolean stirRunning ;//鎼呮媽杩愯 true鏄� false鍚�
public Boolean injectRunning ;//娉ㄨ偉杩愯 true鏄� false鍚�
public Boolean irrRunning ;//鐏屾簤杩愯 true鏄� false鍚�
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
index 099cc9e..d3b2f2b 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
@@ -1,6 +1,6 @@
package com.dy.rtuMw.server.mqtt;
-import com.dy.common.mw.protocol4Mqtt.status.DevRunSt;
+import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo;
import com.dy.rtuMw.server.forTcp.RtuLogDealer;
import com.dy.rtuMw.server.local.localProtocol.RtuOnLineStateStatisticsVo;
@@ -166,7 +166,7 @@
}
}
- public static void setStatus(String devId, DevRunSt st){
+ public static void setStatus(String devId, DevRunInfo st){
DevStatus vo = map.get(devId) ;
if(vo != null) {
if(st.stirRunning != null){
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
index 661696d..8085e91 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
@@ -3,8 +3,8 @@
import com.dy.common.mw.channel.mqtt.MqttClientPool;
import com.dy.common.mw.protocol4Mqtt.MqttNotify;
import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo;
-import com.dy.common.mw.protocol4Mqtt.status.DevOnLineSt;
-import com.dy.common.mw.protocol4Mqtt.status.DevRunSt;
+import com.dy.common.mw.protocol4Mqtt.status.DevOnLineInfo;
+import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo;
import com.dy.rtuMw.server.ServerProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -77,15 +77,15 @@
public void notify(String devId, MqttNotifyInfo... infos) {
if(devId != null && infos != null && infos.length > 0){
for(MqttNotifyInfo info : infos){
- if(info instanceof DevOnLineSt){
- DevOnLineSt onLineSt = (DevOnLineSt)info;
+ if(info instanceof DevOnLineInfo){
+ DevOnLineInfo onLineSt = (DevOnLineInfo)info;
if(onLineSt.onLine != null && onLineSt.onLine.booleanValue()){
- DevStatusDealer.onLine(devId, ((DevOnLineSt)info).protocol);
+ DevStatusDealer.onLine(devId, ((DevOnLineInfo)info).protocol);
}else{
DevStatusDealer.offLine(devId);
}
- } else if(info instanceof DevRunSt){
- DevStatusDealer.setStatus(devId, (DevRunSt)info);
+ } else if(info instanceof DevRunInfo){
+ DevStatusDealer.setStatus(devId, (DevRunInfo)info);
}
}
}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
index 0414628..c109e53 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
@@ -45,33 +45,6 @@
}
}
private void nextDeal(MqttSubMsg subMsg)throws Exception {
- subMsg.action(new Callback() {
- @Override
- public void call(Object obj) {
- MqttSubMsg subMs = (MqttSubMsg) obj ;
- MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ;
- if(pubMs != null){
- //鍖归厤鍒颁笅琛屾秷鎭紙鍛戒护锛�
- subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ;
- subMs.commandId = pubMs.commandId ;
- try {
- MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs));
- } catch (Exception e) {
- log.error("缂撳瓨鍙戝竷娑堟伅锛堝懡浠わ級缁撴灉鍙戠敓寮傚父", e);
- }
- }
- try{
- MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg));
- }catch (Exception e){
- log.error("缂撳瓨璁㈤槄娑堟伅鏁版嵁鍙戠敓寮傚父", e);
- }
- }
- @Override
- public void call(Object... objs) {
- }
- @Override
- public void exception(Exception e) {
- }
- });
+ subMsg.action(new MqttSubMsgDealer());
}
}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java
index 2ae8adc..b4b948a 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java
@@ -86,8 +86,7 @@
while(node != null && node.obj != null){
obj = (MqttPubMsgNode)node.obj;
pubMsg = obj.result ;
- if(pubMsg != null
- && subMsg.subMsgMatchPubMsg(pubMsg)){
+ if(pubMsg != null && subMsg.subMsgMatchPubMsg(pubMsg)){
obj.onceReceivedResult = true ;//鏍囪瘑宸茬粡鏀跺埌鍛戒护缁撴灉
return pubMsg;
}else{
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
index 7b827d5..504ee3f 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
@@ -65,7 +65,7 @@
}else{
if(mqttClient != null && mqttClient.isConnected()){
try {
- mqttManager.publishMsg(mqttClient, this.result.topic, this.result.msg);
+ mqttManager.publishMsg(mqttClient, this.result.topic.longName(), this.result.msg);
DevStatusDealer.afterSendPubMessage(this.result.deviceId);
RtuLogDealer.log4Mqtt(this.result.deviceId, "鍙戝竷娑堟伅 涓婚锛�" + this.result.topic + " 娑堟伅锛�" + this.result.msg);
log.info("鍙戝竷MQTT娑堟伅锛堜富棰�=" + this.result.topic + "锛�" + this.result.msg);
@@ -74,7 +74,11 @@
}finally {
mqttManager.pushMqttClient(mqttClient);
}
- return false ;
+ if(this.result.hasResponse){
+ return false ;
+ }else{
+ return true ;
+ }
}else{
//鏈浘杩炴帴MQTT鏈嶅姟鍣�
return this.decideRemoveNodeFromCach(now) ;
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgDealer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgDealer.java
new file mode 100644
index 0000000..116239d
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgDealer.java
@@ -0,0 +1,41 @@
+package com.dy.rtuMw.server.mqtt;
+
+import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
+import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
+import com.dy.common.util.Callback;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/11 17:33
+ * @Description
+ */
+@Slf4j
+public class MqttSubMsgDealer implements Callback {
+ @Override
+ public void call(Object obj) {
+ MqttSubMsg subMs = (MqttSubMsg) obj ;
+ MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ;
+ if(pubMs != null){
+ //鍖归厤鍒颁笅琛屾秷鎭紙鍛戒护锛�
+ subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ;
+ subMs.commandId = pubMs.commandId ;
+ try {
+ MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs));
+ } catch (Exception e) {
+ log.error("缂撳瓨鍙戝竷娑堟伅锛堝懡浠わ級缁撴灉鍙戠敓寮傚父", e);
+ }
+ }
+ try{
+ MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMs));
+ }catch (Exception e){
+ log.error("缂撳瓨璁㈤槄娑堟伅鏁版嵁鍙戠敓寮傚父", e);
+ }
+ }
+ @Override
+ public void call(Object... objs) {
+ }
+ @Override
+ public void exception(Exception e) {
+ }
+}
--
Gitblit v1.8.0