From b2c928eac40c4b7f495f8164eeb59005219fa350 Mon Sep 17 00:00:00 2001 From: liurunyu <lry9898@163.com> Date: 星期三, 11 六月 2025 17:55:29 +0800 Subject: [PATCH] 1、MQTT协议,增加设备参数命令及相关数据; 2、完善上行数据值对象; 3、完善其他代码。 --- pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java | 29 ---- pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgDealer.java | 41 +++++ pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java | 4 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java | 5 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java | 22 ++ pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java | 4 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunInfo.java | 4 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java | 4 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java | 26 +++ pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java | 5 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java | 10 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java | 20 +- pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java | 4 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ParamSetVo.java | 31 ++++ pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineInfo.java | 2 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java | 1 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java | 43 +++++ pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java | 5 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComSetParamVo.java | 23 +++ pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java | 1 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java | 2 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java | 3 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java | 14 +- pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java | 66 ++++++-- pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java | 8 pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java | 3 26 files changed, 278 insertions(+), 102 deletions(-) diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java index 11cfbd6..5a03f7e 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java @@ -21,7 +21,7 @@ vo.orgTag = topicGrp[0] ; vo.protocol = topicGrp[1] ; vo.devId = topicGrp[2] ; - vo.topic = topicGrp[3] ; + vo.name = topicGrp[3] ; return vo ; } }else{ @@ -30,7 +30,7 @@ } public static String createPubTopic(MqttTopic tp) throws Exception { - return tp.orgTag + "/" + tp.protocol + "/" + tp.devId + "/" + tp.topic ; + return tp.orgTag + "/" + tp.protocol + "/" + tp.devId + "/" + tp.name; } public static MqttSubMsg parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception { diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java index bd864d3..eba4ecf 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java @@ -12,7 +12,7 @@ public String mqttResultSendWebUrl ;//Mqtt杩斿洖鍛戒护缁撴灉 鍙戝悜鐩殑鍦皐eb URL - public String topic ;//娑堟伅涓婚 + public MqttTopic topic ;//娑堟伅涓婚 public String msg ;//娑堟伅 public boolean isCacheForOffLine ;//涓嬭鍛戒护鎺у埗锛屾秷鎭腑闂翠欢涓嶅湪绾挎槸鍚︾紦瀛樺懡浠� diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java index 67b1245..d088e08 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java @@ -16,18 +16,18 @@ public String orgTag ;//缁勭粐鏍囪瘑 public String protocol ;//鍗忚鍚嶇О public String devId ;//璁惧锛團Box锛塈D - public String topic ;//娑堟伅涓婚 + public String name;//娑堟伅涓婚鏈鍚嶇О public boolean isEmpty(){ - return orgTag == null || protocol == null || devId == null || topic == null - || orgTag.trim().length() == 0 || protocol.trim().length() == 0 || devId.trim().length() == 0 || topic.trim().length() == 0 ; + return orgTag == null || protocol == null || devId == null || name == null + || orgTag.trim().length() == 0 || protocol.trim().length() == 0 || devId.trim().length() == 0 || name.trim().length() == 0 ; } public String shortName(){ - return topic ; + return name; } public String longName(){ - return orgTag + "/" + protocol + "/" + devId + "/" + topic ; + return orgTag + "/" + protocol + "/" + devId + "/" + name; } } diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java index 4d0cd99..a78f8b0 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java @@ -10,4 +10,5 @@ public static final String cd_Stir = "01" ;//鎼呮媽鍚仠鍛戒护 public static final String cd_Inject = "02" ;//娉ㄨ偉鍚仠鍛戒护 public static final String cd_Irr = "03" ;//鐏屾簤鍚仠鍛戒护 + public static final String cd_Param = "10" ;//璁惧畾鍙傛暟 } diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java index 9615da3..c654c22 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java @@ -51,8 +51,9 @@ public boolean subMsgMatchPubMsg(MqttPubMsg pubMsg){ if (pubMsg instanceof MqttPubMsgSdV1) { - MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg; + //MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg; if(this.vo4Up != null && this.vo4Up instanceof StateVo){ + //鍙涓婃姤鐨勬槸鐘舵�佹暟鎹紝璇存槑璁惧鍝嶅簲浜嗗懡浠� return true ; } } diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java index 60dc354..8961281 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java @@ -20,5 +20,6 @@ public static final String PubTopicStir = "ctrlStir" ;//鎼呮媽鍚仠 public static final String PubTopicInject = "ctrlInject" ;//娉ㄨ偉鍚仠 public static final String PubTopicIrr = "ctrlIrr" ;//鐏屾簤鍚仠 + public static final String PubTopicParam = "setParam" ;//璁剧疆鍙傛暟 } diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java index 527e6bc..1a7d373 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java @@ -4,15 +4,16 @@ import com.alibaba.fastjson2.JSONObject; import com.dy.common.mw.protocol.Command; import com.dy.common.mw.protocol4Mqtt.MqttCallback; -import com.dy.common.mw.protocol4Mqtt.MqttMsgParser; import com.dy.common.mw.protocol4Mqtt.MqttTopic; import com.dy.common.mw.protocol4Mqtt.Vo4Up; import com.dy.common.mw.protocol4Mqtt.pSdV1.comParam.ComCtrlVo; +import com.dy.common.mw.protocol4Mqtt.pSdV1.comParam.ComSetParamVo; import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.FaultClearVo; import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.InjectStartVo; +import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.ParamSetVo; import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.StirStartVo; import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.*; -import com.dy.common.mw.protocol4Mqtt.status.DevRunSt; +import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo; import org.eclipse.paho.client.mqttv3.MqttMessage; /** @@ -28,8 +29,8 @@ } MqttSubMsgSdV1 msg = new MqttSubMsgSdV1(subTopic, strTxt); Vo4Up vo ; - DevRunSt stVo ; - switch (subTopic.topic) { + DevRunInfo stInfo = null ; + switch (subTopic.name) { case ProtocolConstantSdV1.SubTopicWeather -> { vo = JSON.parseObject(strTxt, WeatherVo.class); break; @@ -45,21 +46,22 @@ case ProtocolConstantSdV1.SubTopicState -> { //姝ゅ鏈畬鎴愶紝搴旇浜х敓涓�浜涢�氫俊鐨刬nfo锛屼緵涓嬮潰callback.notify(objs)閫氱煡鍑哄幓 vo = JSON.parseObject(strTxt, StateVo.class); - stVo = new DevRunSt() ; - stVo.id = msg.deviceId ; - //stVo.stirRunning = true ; //鎼呮媽杩愯 true鏄� false鍚� - //stVo.injectRunning = true ; //娉ㄨ偉杩愯 true鏄� false鍚� - //stVo.irrRunning = true ; //鐏屾簤杩愯 true鏄� false鍚� - //stVo.alarm = true ; //鎶ヨ true鏄� false鍚� + StateVo stVo = (StateVo)vo ; + stInfo = new DevRunInfo() ; + stInfo.devId = msg.deviceId ; + stInfo.stirRunning = (stVo.stirRunning==null?false:(stVo.stirRunning.byteValue()==1?true:false)) ; //鎼呮媽杩愯 true鏄� false鍚� + stInfo.injectRunning = (stVo.injectRunning==null?false:(stVo.injectRunning.byteValue()==1?true:false)) ; //娉ㄨ偉杩愯 true鏄� false鍚� + stInfo.irrRunning = (stVo.irrRunning==null?false:(stVo.irrRunning.byteValue()==1?true:false)) ; //鐏屾簤杩愯 true鏄� false鍚� + stInfo.alarm = (stVo.alarm==null?false:(stVo.alarm.byteValue()==1?true:false)) ; //鎶ヨ true鏄� false鍚� break; } default -> { - throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.topic + "娑堟伅瑙f瀽閫昏緫鏈疄鐜�"); + throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.name + "娑堟伅瑙f瀽閫昏緫鏈疄鐜�"); } } msg.vo4Up = vo ; callback.callback(msg); - callback.notify(null);//姝ゅ鏈畬鎴� + callback.notify(msg.deviceId, stInfo); return msg; } @@ -94,6 +96,13 @@ msg = this.createPubMsgOfIrr(orgTag, com); break; } + case CodeSdV1.cd_Param -> { + //璁剧疆鍙傛暟 + this.checkParam(com); + this.checkRtnWebUrl(com); + msg = this.createPubMsgOfParam(orgTag, com); + break; + } default -> { throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鐗堟湰" + com.protocolVersion + "鍔熻兘鐮�" + com.code + "鏋勯�犲櫒鏈疄鐜�"); } @@ -122,8 +131,8 @@ msg.isCacheForOffLine = false ; msg.hasResponse = true ; msg.cd = CodeSdV1.cd_Fault ; - msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ; - msg.msg = JSON.toJSONString(new FaultClearVo(cvo.isDo)) ; + msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault) ; + msg.msg = JSON.toJSONString(new FaultClearVo(cvo.startTrueStopFalse ?(byte)1:0)) ; return msg ; } private MqttPubMsgSdV1 createPubMsgOfStir(String orgTag, Command com) throws Exception { @@ -138,8 +147,8 @@ msg.isCacheForOffLine = false ; msg.hasResponse = true ; msg.cd = CodeSdV1.cd_Fault ; - msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ; - msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ; + msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicStir) ; + msg.msg = JSON.toJSONString(new StirStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ; return msg ; } private MqttPubMsgSdV1 createPubMsgOfInject(String orgTag, Command com) throws Exception { @@ -154,8 +163,8 @@ msg.isCacheForOffLine = false ; msg.hasResponse = true ; msg.cd = CodeSdV1.cd_Fault ; - msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ; - msg.msg = JSON.toJSONString(new InjectStartVo(cvo.isDo)) ; + msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicInject) ; + msg.msg = JSON.toJSONString(new InjectStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ; return msg ; } private MqttPubMsgSdV1 createPubMsgOfIrr(String orgTag, Command com) throws Exception { @@ -170,8 +179,25 @@ msg.isCacheForOffLine = false ; msg.hasResponse = true ; msg.cd = CodeSdV1.cd_Fault ; - msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ; - msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ; + msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicIrr) ; + msg.msg = JSON.toJSONString(new StirStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ; + return msg ; + } + + private MqttPubMsgSdV1 createPubMsgOfParam(String orgTag, Command com) throws Exception { + JSONObject obj = (JSONObject) com.param; + String json = obj.toJSONString(); + ComSetParamVo cvo = JSON.parseObject(json, ComSetParamVo.class); + if(cvo == null){ + throw new Exception("json杞珻omSetParamVo涓簄ull") ; + } + MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ; + this.setPubMsgBase(com, msg); + msg.isCacheForOffLine = false ; + msg.hasResponse = false ; + msg.cd = CodeSdV1.cd_Param ; + msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicParam) ; + msg.msg = JSON.toJSONString(new ParamSetVo(cvo.stirDuration, cvo.injectDuration)) ; return msg ; } diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java index 185e4f5..6e6e9f6 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java @@ -6,7 +6,7 @@ * @Description */ public class ComCtrlVo { - //鏄惁鎺у埗鍔ㄤ綔锛宼rue鏄紝false鍚� + //鍚仠鍔ㄤ綔锛宼rue鏄紝false鍚� //鍙互鎵ц鍔熻兘鐮� 00锛�01锛�02锛�03鐨勫姩浣� - public boolean isDo;// + public boolean startTrueStopFalse;// } diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComSetParamVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComSetParamVo.java new file mode 100644 index 0000000..a14d5b1 --- /dev/null +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComSetParamVo.java @@ -0,0 +1,23 @@ +package com.dy.common.mw.protocol4Mqtt.pSdV1.comParam; + +import com.alibaba.fastjson2.annotation.JSONField; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @Author: liurunyu + * @Date: 2025/6/11 17:03 + * @Description + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ComSetParamVo { + // 鎼呮媽璁惧畾鏃堕棿 + public Integer stirDuration ; + + // 娉ㄨ偉璁惧畾鏃堕棿 + public Integer injectDuration ; + +} diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java index e9657ce..ec31c5b 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java @@ -16,10 +16,10 @@ @AllArgsConstructor public class FaultClearVo implements Vo4Down { @JSONField(name = "鏁呴殰瑙i櫎") - public boolean isDo ; + public Byte isDo ;//1鏄紝0鍚� @Override public String toString(){ - return "鏁呴殰瑙i櫎锛�" + (isDo?"鏄�":"鍚�") ; + return "鏁呴殰瑙i櫎锛�" + (isDo==1?"鏄�":"鍚�") ; } } diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java index 4311fd2..5e4190e 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java @@ -16,9 +16,10 @@ @AllArgsConstructor public class InjectStartVo implements Vo4Down { @JSONField(name = "娉ㄨ偉鍚仠") - public boolean isDo ;//true涓哄惎锛宖alse涓哄仠 + public Byte isDo ;//1鏄紝0鍚� + @Override public String toString(){ - return "娉ㄨ偉鍚仠锛�" + (isDo?"鍚�":"鍋�") ; + return "娉ㄨ偉鍚仠锛�" + (isDo==1?"鍚�":"鍋�") ; } } diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java index f05c154..69feb68 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java @@ -16,9 +16,10 @@ @AllArgsConstructor public class IrrStartVo implements Vo4Down { @JSONField(name = "鐏屾簤鍚仠") - public boolean isDo ;//true涓哄惎锛宖alse涓哄仠 + public Byte isDo ;//1鏄紝0鍚� + @Override public String toString(){ - return "鐏屾簤鍚仠锛�" + (isDo?"鍚�":"鍋�") ; + return "鐏屾簤鍚仠锛�" + (isDo==1?"鏄�":"鍚�") ; } } diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ParamSetVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ParamSetVo.java new file mode 100644 index 0000000..48ecd14 --- /dev/null +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ParamSetVo.java @@ -0,0 +1,31 @@ +package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos; + +import com.alibaba.fastjson2.annotation.JSONField; +import com.dy.common.mw.protocol4Mqtt.Vo4Down; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @Author: liurunyu + * @Date: 2025/6/11 16:55 + * @Description + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ParamSetVo implements Vo4Down { + @JSONField(name = "鎼呮媽璁惧畾鏃堕棿") + public Integer stirDuration ; + + @JSONField(name = "娉ㄨ偉璁惧畾鏃堕棿") + public Integer injectDuration ; + + @Override + public String toString(){ + StringBuilder sb = new StringBuilder(); + sb.append("鎼呮媽璁惧畾鏃堕棿锛�" + stirDuration + "\n" ); + sb.append("娉ㄨ偉璁惧畾鏃堕棿锛�" + injectDuration + "\n" ); + return sb.toString(); + } +} diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java index 44baf99..333207e 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java @@ -16,9 +16,10 @@ @AllArgsConstructor public class StirStartVo implements Vo4Down { @JSONField(name = "鎼呮媽鍚仠") - public boolean isDo ;//true涓哄惎锛宖alse涓哄仠 + public Byte isDo ;//1鏄紝0鍚� + @Override public String toString(){ - return "鎼呮媽鍚仠锛�" + (isDo?"鍚�":"鍋�") ; + return "鎼呮媽鍚仠锛�" + (isDo==1?"鏄�":"鍚�") ; } } diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java index f038514..a2f9ae8 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java @@ -12,6 +12,18 @@ */ @Data public class ManureVo implements Vo4Up { + @JSONField(name = "flexem_message_id") + public Integer messageId ;//娑堟伅ID + + @JSONField(name = "鑲ユ枡娴侀噺") + public Float manureFlow ;//鑲ユ枡娴侀噺 + + @JSONField(name = "娉ㄨ偉鏃堕暱") + public Integer manureTime ;//娉ㄨ偉鏃堕暱 + + @JSONField(name = "鎼呮媽鏃堕暱") + public Integer stirTime ;//鎼呮媽鏃堕暱 + @JSONField(name = "flexem_timestamp") public Long devDt ;//璁惧鏃堕棿 @@ -27,9 +39,13 @@ @Override public String toString(){ StringBuilder sb = new StringBuilder(); - sb.append("姘磋偉鏁版嵁锛�") ; - sb.append(" 璁惧鏃堕棿锛�"+devDt) ; - sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ; + sb.append("姘磋偉鏁版嵁=>") ; + sb.append(" 娑堟伅ID锛�" + messageId + ", ") ; + sb.append(" 鑲ユ枡娴侀噺锛�" + manureFlow + ", ") ; + sb.append(" 娉ㄨ偉鏃堕暱锛�" + manureTime + ", ") ; + sb.append(" 鎼呮媽鏃堕暱锛�" + stirTime + ", ") ; + sb.append(" 璁惧鏃堕棿锛�" + devDt + ", ") ; + sb.append(" 璁惧鏃堕棿锛�" + this.getDevDtStr() + ", ") ; sb.append("\n") ; return sb.toString() ; } diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java index a465609..f5bb1e0 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java @@ -12,6 +12,33 @@ */ @Data public class SoilVo implements Vo4Up { + @JSONField(name = "flexem_message_id") + public Integer messageId ;//娑堟伅ID + + @JSONField(name = "鍦熷¥婀垮害1") + public Float soilHumidity1 ;//鍦熷¥婀垮害1 + + @JSONField(name = "鍦熷¥婀垮害2") + public Float soilHumidity2 ;//鍦熷¥婀垮害2 + + @JSONField(name = "鍦熷¥婀垮害3") + public Float soilHumidity3 ;//鍦熷¥婀垮害3 + + @JSONField(name = "鍦熷¥婀垮害4") + public Float soilHumidity4 ;//鍦熷¥婀垮害4 + + @JSONField(name = "鍦熷¥婀垮害1") + public Float soilTemperature1 ;//鍦熷¥娓╁害1 + + @JSONField(name = "鍦熷¥娓╁害2") + public Float soilTemperature2 ;//鍦熷¥娓╁害2 + + @JSONField(name = "鍦熷¥娓╁害3") + public Float soilTemperature3 ;//鍦熷¥娓╁害3 + + @JSONField(name = "鍦熷¥娓╁害4") + public Float soilTemperature4 ;//鍦熷¥娓╁害4 + @JSONField(name = "flexem_timestamp") public Long devDt ;//璁惧鏃堕棿 @@ -27,10 +54,20 @@ @Override public String toString(){ StringBuilder sb = new StringBuilder(); - sb.append("澧掓儏鏁版嵁锛�") ; - sb.append(" 璁惧鏃堕棿锛�"+devDt) ; - sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ; + sb.append("澧掓儏鏁版嵁=>") ; + sb.append(" 娑堟伅ID锛�" + messageId + ", ") ; + sb.append(" 鍦熷¥婀垮害1锛�" + soilHumidity1 + ", ") ; + sb.append(" 鍦熷¥婀垮害2锛�" + soilHumidity2 + ", ") ; + sb.append(" 鍦熷¥婀垮害3锛�" + soilHumidity3 + ", ") ; + sb.append(" 鍦熷¥婀垮害4锛�" + soilHumidity4 + ", ") ; + sb.append(" 鍦熷¥娓╁害1锛�" + soilTemperature1 + ", ") ; + sb.append(" 鍦熷¥娓╁害2锛�" + soilTemperature2 + ", ") ; + sb.append(" 鍦熷¥娓╁害3锛�" + soilTemperature3 + ", ") ; + sb.append(" 鍦熷¥娓╁害4锛�" + soilTemperature4 + ", ") ; + sb.append(" 璁惧鏃堕棿锛�" + devDt + ", ") ; + sb.append(" 璁惧鏃堕棿锛�" + this.getDevDtStr() + ", ") ; sb.append("\n") ; return sb.toString() ; + } } diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java index cae3afc..fb3885d 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java @@ -12,6 +12,21 @@ */ @Data public class StateVo implements Vo4Up { + @JSONField(name = "flexem_message_id") + public Integer messageId ;//娑堟伅ID + + @JSONField(name = "鎼呮媽杩愯") + public Byte stirRunning ;//鎼呮媽杩愯 + + @JSONField(name = "娉ㄨ偉杩愯") + public Byte injectRunning ;//娉ㄨ偉杩愯 + + @JSONField(name = "鐏屾簤杩愯") + public Byte irrRunning ;//鐏屾簤杩愯 + + @JSONField(name = "鎶ヨ") + public Byte alarm ;//鎶ヨ + @JSONField(name = "flexem_timestamp") public Long devDt ;//璁惧鏃堕棿 @@ -27,9 +42,14 @@ @Override public String toString(){ StringBuilder sb = new StringBuilder(); - sb.append("鐘舵�佹暟鎹細") ; - sb.append(" 璁惧鏃堕棿锛�"+devDt) ; - sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ; + sb.append("鐘舵�佹暟鎹�=>") ; + sb.append(" 娑堟伅ID锛�" + messageId + ", ") ; + sb.append(" 鎼呮媽杩愯锛�" + stirRunning + ", ") ; + sb.append(" 娉ㄨ偉杩愯锛�" + injectRunning + ", ") ; + sb.append(" 鐏屾簤杩愯锛�" + irrRunning + ", ") ; + sb.append(" 鎶ヨ锛�" + alarm + ", ") ; + sb.append(" 璁惧鏃堕棿锛�" + devDt + ", ") ; + sb.append(" 璁惧鏃堕棿锛�" + this.getDevDtStr() + ", ") ; sb.append("\n") ; return sb.toString() ; } diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java index 62ba401..170bf51 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java @@ -54,16 +54,16 @@ public String toString(){ StringBuilder sb = new StringBuilder(); sb.append("姘旇薄鏁版嵁=>") ; - sb.append(" 娑堟伅ID锛�"+messageId + ", ") ; - sb.append(" 浜屾哀鍖栫⒊锛�"+carbonDioxide + ", ") ; - sb.append(" 鍏夌収寮哄害锛�"+lightIntensity + ", ") ; - sb.append(" 澶ф皵鍘嬪姏锛�"+atmosphericPressure + ", ") ; - sb.append(" 绌烘皵娓╁害锛�"+airTemperature + ", ") ; - sb.append(" 绌烘皵婀垮害锛�"+airHumidity + ", ") ; - sb.append(" PM2.5锛�"+pm25 + ", ") ; - sb.append(" PM10锛�"+pm10 + ", ") ; - sb.append(" 璁惧鏃堕棿锛�"+devDt + ", ") ; - sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr() + ", ") ; + sb.append(" 娑堟伅ID锛�" + messageId + ", ") ; + sb.append(" 浜屾哀鍖栫⒊锛�" + carbonDioxide + ", ") ; + sb.append(" 鍏夌収寮哄害锛�" + lightIntensity + ", ") ; + sb.append(" 澶ф皵鍘嬪姏锛�" + atmosphericPressure + ", ") ; + sb.append(" 绌烘皵娓╁害锛�" + airTemperature + ", ") ; + sb.append(" 绌烘皵婀垮害锛�" + airHumidity + ", ") ; + sb.append(" PM2.5锛�" + pm25 + ", ") ; + sb.append(" PM10锛�" + pm10 + ", ") ; + sb.append(" 璁惧鏃堕棿锛�" + devDt + ", ") ; + sb.append(" 璁惧鏃堕棿锛�" + this.getDevDtStr() + ", ") ; sb.append("\n") ; return sb.toString() ; } diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineInfo.java similarity index 83% rename from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java rename to pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineInfo.java index 1092b27..99eda6c 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineInfo.java @@ -9,7 +9,7 @@ * @Description */ @Data -public class DevOnLineSt implements MqttNotifyInfo { +public class DevOnLineInfo implements MqttNotifyInfo { public String id ; public String protocol ; public Boolean onLine ; diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunInfo.java similarity index 82% rename from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java rename to pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunInfo.java index 0fe300a..4811f1c 100644 --- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java +++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunInfo.java @@ -9,8 +9,8 @@ * @Description */ @Data -public class DevRunSt implements MqttNotifyInfo { - public String id ; +public class DevRunInfo implements MqttNotifyInfo { + public String devId ;//MQTT璁剧疆ID public Boolean stirRunning ;//鎼呮媽杩愯 true鏄� false鍚� public Boolean injectRunning ;//娉ㄨ偉杩愯 true鏄� false鍚� public Boolean irrRunning ;//鐏屾簤杩愯 true鏄� false鍚� diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java index 099cc9e..d3b2f2b 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java @@ -1,6 +1,6 @@ package com.dy.rtuMw.server.mqtt; -import com.dy.common.mw.protocol4Mqtt.status.DevRunSt; +import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo; import com.dy.rtuMw.server.forTcp.RtuLogDealer; import com.dy.rtuMw.server.local.localProtocol.RtuOnLineStateStatisticsVo; @@ -166,7 +166,7 @@ } } - public static void setStatus(String devId, DevRunSt st){ + public static void setStatus(String devId, DevRunInfo st){ DevStatus vo = map.get(devId) ; if(vo != null) { if(st.stirRunning != null){ diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java index 661696d..8085e91 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java @@ -3,8 +3,8 @@ import com.dy.common.mw.channel.mqtt.MqttClientPool; import com.dy.common.mw.protocol4Mqtt.MqttNotify; import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo; -import com.dy.common.mw.protocol4Mqtt.status.DevOnLineSt; -import com.dy.common.mw.protocol4Mqtt.status.DevRunSt; +import com.dy.common.mw.protocol4Mqtt.status.DevOnLineInfo; +import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo; import com.dy.rtuMw.server.ServerProperties; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -77,15 +77,15 @@ public void notify(String devId, MqttNotifyInfo... infos) { if(devId != null && infos != null && infos.length > 0){ for(MqttNotifyInfo info : infos){ - if(info instanceof DevOnLineSt){ - DevOnLineSt onLineSt = (DevOnLineSt)info; + if(info instanceof DevOnLineInfo){ + DevOnLineInfo onLineSt = (DevOnLineInfo)info; if(onLineSt.onLine != null && onLineSt.onLine.booleanValue()){ - DevStatusDealer.onLine(devId, ((DevOnLineSt)info).protocol); + DevStatusDealer.onLine(devId, ((DevOnLineInfo)info).protocol); }else{ DevStatusDealer.offLine(devId); } - } else if(info instanceof DevRunSt){ - DevStatusDealer.setStatus(devId, (DevRunSt)info); + } else if(info instanceof DevRunInfo){ + DevStatusDealer.setStatus(devId, (DevRunInfo)info); } } } diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java index 0414628..c109e53 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java @@ -45,33 +45,6 @@ } } private void nextDeal(MqttSubMsg subMsg)throws Exception { - subMsg.action(new Callback() { - @Override - public void call(Object obj) { - MqttSubMsg subMs = (MqttSubMsg) obj ; - MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ; - if(pubMs != null){ - //鍖归厤鍒颁笅琛屾秷鎭紙鍛戒护锛� - subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ; - subMs.commandId = pubMs.commandId ; - try { - MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs)); - } catch (Exception e) { - log.error("缂撳瓨鍙戝竷娑堟伅锛堝懡浠わ級缁撴灉鍙戠敓寮傚父", e); - } - } - try{ - MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg)); - }catch (Exception e){ - log.error("缂撳瓨璁㈤槄娑堟伅鏁版嵁鍙戠敓寮傚父", e); - } - } - @Override - public void call(Object... objs) { - } - @Override - public void exception(Exception e) { - } - }); + subMsg.action(new MqttSubMsgDealer()); } } diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java index 2ae8adc..b4b948a 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java @@ -86,8 +86,7 @@ while(node != null && node.obj != null){ obj = (MqttPubMsgNode)node.obj; pubMsg = obj.result ; - if(pubMsg != null - && subMsg.subMsgMatchPubMsg(pubMsg)){ + if(pubMsg != null && subMsg.subMsgMatchPubMsg(pubMsg)){ obj.onceReceivedResult = true ;//鏍囪瘑宸茬粡鏀跺埌鍛戒护缁撴灉 return pubMsg; }else{ diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java index 7b827d5..504ee3f 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java @@ -65,7 +65,7 @@ }else{ if(mqttClient != null && mqttClient.isConnected()){ try { - mqttManager.publishMsg(mqttClient, this.result.topic, this.result.msg); + mqttManager.publishMsg(mqttClient, this.result.topic.longName(), this.result.msg); DevStatusDealer.afterSendPubMessage(this.result.deviceId); RtuLogDealer.log4Mqtt(this.result.deviceId, "鍙戝竷娑堟伅 涓婚锛�" + this.result.topic + " 娑堟伅锛�" + this.result.msg); log.info("鍙戝竷MQTT娑堟伅锛堜富棰�=" + this.result.topic + "锛�" + this.result.msg); @@ -74,7 +74,11 @@ }finally { mqttManager.pushMqttClient(mqttClient); } - return false ; + if(this.result.hasResponse){ + return false ; + }else{ + return true ; + } }else{ //鏈浘杩炴帴MQTT鏈嶅姟鍣� return this.decideRemoveNodeFromCach(now) ; diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgDealer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgDealer.java new file mode 100644 index 0000000..116239d --- /dev/null +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgDealer.java @@ -0,0 +1,41 @@ +package com.dy.rtuMw.server.mqtt; + +import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; +import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; +import com.dy.common.util.Callback; +import lombok.extern.slf4j.Slf4j; + +/** + * @Author: liurunyu + * @Date: 2025/6/11 17:33 + * @Description + */ +@Slf4j +public class MqttSubMsgDealer implements Callback { + @Override + public void call(Object obj) { + MqttSubMsg subMs = (MqttSubMsg) obj ; + MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ; + if(pubMs != null){ + //鍖归厤鍒颁笅琛屾秷鎭紙鍛戒护锛� + subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ; + subMs.commandId = pubMs.commandId ; + try { + MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs)); + } catch (Exception e) { + log.error("缂撳瓨鍙戝竷娑堟伅锛堝懡浠わ級缁撴灉鍙戠敓寮傚父", e); + } + } + try{ + MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMs)); + }catch (Exception e){ + log.error("缂撳瓨璁㈤槄娑堟伅鏁版嵁鍙戠敓寮傚父", e); + } + } + @Override + public void call(Object... objs) { + } + @Override + public void exception(Exception e) { + } +} -- Gitblit v1.8.0