From 6b828ba1310db528aa8172bd14a0253ebca5a844 Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期二, 10 六月 2025 18:34:29 +0800
Subject: [PATCH] 基于mqtt的水肥机、气象站、墒情站协议、功能模块继续开发
---
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java | 35 +
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java | 31 +
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java | 3
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/RtuLogDealer.java | 18
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java | 24 +
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java | 36 +
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatus.java | 23 +
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java | 10
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Up.java | 10
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java | 2
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java | 65 ++
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java | 16
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java | 25 +
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java | 4
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java | 36 +
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/说明.txt | 4
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java | 18
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java | 24 +
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/softUpgrade/state/UpgradeRtu.java | 37 -
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java | 13
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java | 21
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java | 70 +++
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java | 186 ++++++++
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotify.java | 16
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java | 13
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties | 32
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java | 36 +
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCom.java | 2
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Down.java | 10
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java | 24 +
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml | 17
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java | 31 +
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotifyInfo.java | 9
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCallback.java | 19
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java | 44 +
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java | 131 +++--
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java | 3
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java | 20
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java | 197 +++++++-
39 files changed, 1,119 insertions(+), 196 deletions(-)
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCallback.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCallback.java
new file mode 100644
index 0000000..35ff0ef
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCallback.java
@@ -0,0 +1,19 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 15:39
+ * @Description
+ */
+public interface MqttCallback {
+ /**
+ * @param subMsg 璁㈤槄鐨勬秷鎭�
+ */
+ void callback(MqttSubMsg subMsg) ;
+
+ /**
+ * 鍙湁鍗忚瑙f瀽鍣ㄦ墠鐭ラ亾RTU鐪熷疄鐨勭姸鎬侊紝鎵�璁ゆ彁渚涙鎺ュ彛锛屽悜澶栭�氱煡璁惧鐨勪竴浜涚姸鎬�
+ * @param infos
+ */
+ void notify(String devId, MqttNotifyInfo...infos) ;
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCom.java
similarity index 95%
rename from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java
rename to pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCom.java
index 8663e05..c371a00 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCom.java
@@ -8,7 +8,7 @@
* @Description 鍛戒护鍊煎璞�
*/
@Data
-public class Com4Mqtt {
+public class MqttCom {
public String commandId ;//鍛戒护ID
public String deviceId ;//璁惧ID
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java
index 15ec6b0..11cfbd6 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java
@@ -11,25 +11,38 @@
* @Description
*/
public class MqttMsgParser {
- public MqttSubMsg parseSubMsg(String topic, MqttMessage mqttMsg) throws Exception {
+ public static MqttTopic parseSubTopic(String topic) throws Exception {
if(topic != null && topic.trim().length() != 0){
String[] topicGrp = topic.split("/") ;
if(topicGrp.length != 4){
throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓嶅彲璇嗗埆") ;
}else{
- if(topicGrp[1].equals("sd1")){
- //灞变笢璁惧(鍗忚)锛屼笖鐗堟湰鍙蜂负1
- return new ProtocolParserSdV1().parseSubMsg(topicGrp[2], topic, mqttMsg);
- }else{
- throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓崗璁紙鍘傚鍙婄増鏈級涓嶅彲璇嗗埆") ;
- }
+ MqttTopic vo = new MqttTopic() ;
+ vo.orgTag = topicGrp[0] ;
+ vo.protocol = topicGrp[1] ;
+ vo.devId = topicGrp[2] ;
+ vo.topic = topicGrp[3] ;
+ return vo ;
}
}else{
- throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓虹┖") ;
+ throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓嶅悎娉�") ;
}
}
- public MqttPubMsg createPubMsg(String orgTag, Command com) throws Exception {
+ public static String createPubTopic(MqttTopic tp) throws Exception {
+ return tp.orgTag + "/" + tp.protocol + "/" + tp.devId + "/" + tp.topic ;
+ }
+
+ public static MqttSubMsg parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception {
+ if(subTopic.protocol.equals(ProtocolConstantSdV1.protocolName + ProtocolConstantSdV1.protocolVer)){
+ //灞变笢璁惧(鍗忚)锛屼笖鐗堟湰鍙蜂负1
+ return new ProtocolParserSdV1().parseSubMsg(subTopic, mqttMsg, callback);
+ }else{
+ throw new Exception("鎺ユ敹鐨刴qtt娑堟伅涓婚涓崗璁紙鍘傚鍙婄増鏈級涓嶅彲璇嗗埆") ;
+ }
+ }
+
+ public static MqttPubMsg createPubMsg(String orgTag, Command com) throws Exception {
if(com.protocol == null && com.protocol.trim().length() != 0){
throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屼絾鏈彁渚涘崗璁�") ;
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotify.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotify.java
new file mode 100644
index 0000000..ca81a61
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotify.java
@@ -0,0 +1,16 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 15:56
+ * @Description
+ */
+public interface MqttNotify {
+ /**
+ * MQTT DEV 淇℃伅閫氱煡
+ * @param devId
+ * @param info
+ */
+ void notify(String devId,
+ MqttNotifyInfo...info) ;
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotifyInfo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotifyInfo.java
new file mode 100644
index 0000000..2f0e405
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotifyInfo.java
@@ -0,0 +1,9 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 15:42
+ * @Description
+ */
+public interface MqttNotifyInfo {
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
index 61b4f65..bc144f2 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
@@ -10,10 +10,10 @@
public abstract class MqttSubMsg {
public String commandId ;//鍛戒护ID
+ public String mqttResultSendWebUrl ;//Mtt杩斿洖鍛戒护缁撴灉 鍙戝悜鐩殑鍦皐eb URL
public String deviceId ;//璁惧ID
-
- public String mqttResultSendWebUrl ;//Mtt杩斿洖鍛戒护缁撴灉 鍙戝悜鐩殑鍦皐eb URL
+ public String protocol;//鍗忚
public String topic ;//娑堟伅涓婚
public String msg ;//娑堟伅
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
new file mode 100644
index 0000000..8403a73
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
@@ -0,0 +1,21 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 9:47
+ * @Description
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class MqttTopic {
+ public String orgTag ;//缁勭粐鏍囪瘑
+ public String protocol ;//鍗忚鍚嶇О
+ public String devId ;//璁惧锛團Box锛塈D
+ public String topic ;//娑堟伅涓婚
+
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Down.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Down.java
new file mode 100644
index 0000000..2997873
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Down.java
@@ -0,0 +1,10 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 14:11
+ * @Description
+ */
+public interface Vo4Down {
+ String toString() ;
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Up.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Up.java
new file mode 100644
index 0000000..2fd7a81
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Up.java
@@ -0,0 +1,10 @@
+package com.dy.common.mw.protocol4Mqtt;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 10:06
+ * @Description
+ */
+public interface Vo4Up {
+ String toString() ;
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java
index 9817c1b..6193843 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java
@@ -13,8 +13,7 @@
@EqualsAndHashCode(callSuper=false)
public class MqttPubMsgSdV1 extends MqttPubMsg {
- public Integer address ;//瀵勫瓨鍣ㄥ湴鍧�
- public String value ;//瀵勫瓨鍣ㄥ��
+ public String cd ;//鍔熻兘鐮�
@Override
public boolean valid() {
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
index e26ff6d..001b8c3 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
@@ -2,6 +2,9 @@
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
+import com.dy.common.mw.protocol4Mqtt.MqttTopic;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.StateVo;
+import com.dy.common.mw.protocol4Mqtt.Vo4Up;
import com.dy.common.util.Callback;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -14,14 +17,14 @@
@Data
@EqualsAndHashCode(callSuper=false)
public class MqttSubMsgSdV1 extends MqttSubMsg {
- public Integer address ;//瀵勫瓨鍣ㄥ湴鍧�
- public String value ;//瀵勫瓨鍣ㄥ��
+ public Vo4Up vo4Up;//璁㈤槄鐨勬秷鎭暟鎹�煎璞�
public MqttSubMsgSdV1(){}
- public MqttSubMsgSdV1(String deviceId, String topic, String msg) {
- this.deviceId = deviceId ;
- this.topic = topic ;
+ public MqttSubMsgSdV1(MqttTopic subTopic, String msg) {
+ this.deviceId = subTopic.devId ;
+ this.protocol = subTopic.protocol ;
+ this.topic = subTopic.topic ;
this.msg = msg ;
}
public String toString(){
@@ -37,6 +40,11 @@
sb.append("娑堟伅:")
.append(msg)
.append("\n") ;
+ if(vo4Up != null){
+ sb.append("鏁版嵁:")
+ .append(vo4Up.toString())
+ .append("\n") ;
+ }
return sb.toString() ;
}
@@ -44,7 +52,7 @@
public boolean subMsgMatchPubMsg(MqttPubMsg pubMsg){
if (pubMsg instanceof MqttPubMsgSdV1) {
MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg;
- if(this.address.intValue() == pubMsgSdV1.getAddress().intValue()){
+ if(this.vo4Up != null && this.vo4Up instanceof StateVo){
return true ;
}
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java
index 8d7e5cd..60dc354 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java
@@ -8,4 +8,17 @@
public class ProtocolConstantSdV1 {
public static final String protocolName = "sd" ;
public static final short protocolVer = 1 ;
+
+ //璁㈤槄鐨勪富棰�
+ public static final String SubTopicWeather = "weather" ;//姘旇薄
+ public static final String SubTopicSoil = "soil" ;//鍦熷¥澧掓儏
+ public static final String SubTopicManure = "manure" ;//姘磋偉
+ public static final String SubTopicState = "state" ;//鐘舵��
+
+ //鍙戝竷鐨勪富棰�
+ public static final String PubTopicFault = "ctrlFault" ;//鏁呴殰瑙i櫎
+ public static final String PubTopicStir = "ctrlStir" ;//鎼呮媽鍚仠
+ public static final String PubTopicInject = "ctrlInject" ;//娉ㄨ偉鍚仠
+ public static final String PubTopicIrr = "ctrlIrr" ;//鐏屾簤鍚仠
+
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
index 1ca98b2..df625b1 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
@@ -3,9 +3,16 @@
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.dy.common.mw.protocol.Command;
-import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.ComCtrlVo;
+import com.dy.common.mw.protocol4Mqtt.MqttCallback;
+import com.dy.common.mw.protocol4Mqtt.MqttMsgParser;
+import com.dy.common.mw.protocol4Mqtt.MqttTopic;
+import com.dy.common.mw.protocol4Mqtt.Vo4Up;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.comParam.ComCtrlVo;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.FaultClearVo;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.InjectStartVo;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.StirStartVo;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.*;
import org.eclipse.paho.client.mqttv3.MqttMessage;
-
/**
* @Author: liurunyu
@@ -13,44 +20,74 @@
* @Description
*/
public class ProtocolParserSdV1 {
- public MqttSubMsgSdV1 parseSubMsg(String deviceId, String topic, MqttMessage mqttMsg) throws Exception {
- MqttSubMsgSdV1 ms = new MqttSubMsgSdV1(deviceId, topic, new String(mqttMsg.getPayload(), "UTF-8"));
+ public MqttSubMsgSdV1 parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception {
+ String msg = new String(mqttMsg.getPayload(), "UTF-8");
+ if(JSON.isValid(msg)){
+ throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.topic + "娑堟伅鏍煎紡闈瀓son鏁版嵁(" + msg + ")") ;
+ }
+ MqttSubMsgSdV1 ms = new MqttSubMsgSdV1(subTopic, msg);
+ Vo4Up vo ;
+ switch (subTopic.topic) {
+ case ProtocolConstantSdV1.SubTopicWeather -> {
+ vo = JSON.parseObject(msg, WeatherVo.class);
+ break;
+ }
+ case ProtocolConstantSdV1.SubTopicSoil -> {
+ vo = JSON.parseObject(msg, SoilVo.class);
+ break;
+ }
+ case ProtocolConstantSdV1.SubTopicManure -> {
+ vo = JSON.parseObject(msg, ManureVo.class);
+ break;
+ }
+ case ProtocolConstantSdV1.SubTopicState -> {
+ //姝ゅ鏈畬鎴愶紝搴旇浜х敓涓�浜涢�氫俊鐨刬nfo锛屼緵涓嬮潰callback.notify(objs)閫氱煡鍑哄幓
+ vo = JSON.parseObject(msg, StateVo.class);
+ break;
+ }
+ default -> {
+ throw new Exception("鎺ユ敹鍒癕QTT娑堟伅锛屽崗璁�" + subTopic.protocol + "锛岃澶嘔D" + subTopic.devId + "锛屼富棰�" + subTopic.topic + "娑堟伅瑙f瀽閫昏緫鏈疄鐜�");
+ }
+ }
+ ms.vo4Up = vo ;
+ callback.callback(ms);
+ callback.notify(null);//姝ゅ鏈畬鎴�
return ms;
}
public MqttPubMsgSdV1 createPubMsg(String orgTag, Command com) throws Exception {
- MqttPubMsgSdV1 msg = null ;
- switch (com.code){
- case CodeSdV1.cd_Fault:{
+ MqttPubMsgSdV1 msg ;
+ switch (com.code) {
+ case CodeSdV1.cd_Fault -> {
//鏁呴殰瑙i櫎鍛戒护
this.checkParam(com);
this.checkRtnWebUrl(com);
- msg = this.createPubMsgOfFault(orgTag, com) ;
- break ;
+ msg = this.createPubMsgOfFault(orgTag, com);
+ break;
}
- case CodeSdV1.cd_Stir:{
+ case CodeSdV1.cd_Stir -> {
//鎼呮媽鍚仠鍛戒护
this.checkParam(com);
this.checkRtnWebUrl(com);
- msg = this.createPubMsgOfStir(orgTag, com) ;
- break ;
+ msg = this.createPubMsgOfStir(orgTag, com);
+ break;
}
- case CodeSdV1.cd_Inject:{
+ case CodeSdV1.cd_Inject -> {
//娉ㄨ偉鍚仠鍛戒护
this.checkParam(com);
this.checkRtnWebUrl(com);
- msg = this.createPubMsgOfInject(orgTag, com) ;
- break ;
+ msg = this.createPubMsgOfInject(orgTag, com);
+ break;
}
- case CodeSdV1.cd_Irr:{
+ case CodeSdV1.cd_Irr -> {
//鐏屾簤鍚仠鍛戒护
this.checkParam(com);
this.checkRtnWebUrl(com);
- msg = this.createPubMsgOfIrr(orgTag, com) ;
- break ;
+ msg = this.createPubMsgOfIrr(orgTag, com);
+ break;
}
- default:{
- throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鐗堟湰" + com.protocolVersion + "鍔熻兘鐮�" + com.code + "鏋勯�犲櫒鏈疄鐜�") ;
+ default -> {
+ throw new Exception("鎺ユ敹鍒癕QTT鍛戒护锛屽崗璁�" + com.protocol + "鐗堟湰" + com.protocolVersion + "鍔熻兘鐮�" + com.code + "鏋勯�犲櫒鏈疄鐜�");
}
}
return msg ;
@@ -76,10 +113,9 @@
this.setPubMsgBase(com, msg);
msg.isCacheForOffLine = false ;
msg.hasResponse = true ;
- msg.address = 123 ;
- msg.value = "" + (cvo.isDo?1:0);
- msg.topic = createTopic(orgTag, com) ;
- msg.msg = "" ;
+ msg.cd = CodeSdV1.cd_Fault ;
+ msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
+ msg.msg = JSON.toJSONString(new FaultClearVo(cvo.isDo)) ;
return msg ;
}
private MqttPubMsgSdV1 createPubMsgOfStir(String orgTag, Command com) throws Exception {
@@ -93,10 +129,9 @@
this.setPubMsgBase(com, msg);
msg.isCacheForOffLine = false ;
msg.hasResponse = true ;
- msg.address = 123 ;
- msg.value = "" + (cvo.isDo?1:0);
- msg.topic = createTopic(orgTag, com) ;
- msg.msg = "" ;
+ msg.cd = CodeSdV1.cd_Fault ;
+ msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
+ msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ;
return msg ;
}
private MqttPubMsgSdV1 createPubMsgOfInject(String orgTag, Command com) throws Exception {
@@ -110,10 +145,9 @@
this.setPubMsgBase(com, msg);
msg.isCacheForOffLine = false ;
msg.hasResponse = true ;
- msg.address = 123 ;
- msg.value = "" + (cvo.isDo?1:0);
- msg.topic = createTopic(orgTag, com) ;
- msg.msg = "" ;
+ msg.cd = CodeSdV1.cd_Fault ;
+ msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
+ msg.msg = JSON.toJSONString(new InjectStartVo(cvo.isDo)) ;
return msg ;
}
private MqttPubMsgSdV1 createPubMsgOfIrr(String orgTag, Command com) throws Exception {
@@ -127,10 +161,9 @@
this.setPubMsgBase(com, msg);
msg.isCacheForOffLine = false ;
msg.hasResponse = true ;
- msg.address = 123 ;
- msg.value = "" + (cvo.isDo?1:0);
- msg.topic = createTopic(orgTag, com) ;
- msg.msg = "" ;
+ msg.cd = CodeSdV1.cd_Fault ;
+ msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
+ msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ;
return msg ;
}
@@ -140,31 +173,5 @@
msg.mqttResultSendWebUrl = com.rtuResultSendWebUrl ;
}
- private String createTopic(String orgTag, Command com){
- String topic = null ;
- switch (com.code){
- case CodeSdV1.cd_Fault:{
- topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m4" ;
- break ;
- }
- case CodeSdV1.cd_Stir:{
- topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m80" ;
- break ;
- }
- case CodeSdV1.cd_Inject:{
- topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m1" ;
- break ;
- }
- case CodeSdV1.cd_Irr:{
- topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m2" ;
- break ;
- }
- default:{
- topic = null ;
- break;
- }
- }
- return topic ;
- }
}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java
similarity index 80%
rename from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java
rename to pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java
index 4f90810..185e4f5 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java
@@ -1,4 +1,4 @@
-package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
+package com.dy.common.mw.protocol4Mqtt.pSdV1.comParam;
/**
* @Author: liurunyu
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java
new file mode 100644
index 0000000..e9657ce
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java
@@ -0,0 +1,25 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Down;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 14:07
+ * @Description
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class FaultClearVo implements Vo4Down {
+ @JSONField(name = "鏁呴殰瑙i櫎")
+ public boolean isDo ;
+
+ @Override
+ public String toString(){
+ return "鏁呴殰瑙i櫎锛�" + (isDo?"鏄�":"鍚�") ;
+ }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java
new file mode 100644
index 0000000..4311fd2
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java
@@ -0,0 +1,24 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Down;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 14:13
+ * @Description
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class InjectStartVo implements Vo4Down {
+ @JSONField(name = "娉ㄨ偉鍚仠")
+ public boolean isDo ;//true涓哄惎锛宖alse涓哄仠
+ @Override
+ public String toString(){
+ return "娉ㄨ偉鍚仠锛�" + (isDo?"鍚�":"鍋�") ;
+ }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java
new file mode 100644
index 0000000..f05c154
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java
@@ -0,0 +1,24 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Down;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 14:13
+ * @Description
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class IrrStartVo implements Vo4Down {
+ @JSONField(name = "鐏屾簤鍚仠")
+ public boolean isDo ;//true涓哄惎锛宖alse涓哄仠
+ @Override
+ public String toString(){
+ return "鐏屾簤鍚仠锛�" + (isDo?"鍚�":"鍋�") ;
+ }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java
new file mode 100644
index 0000000..44baf99
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java
@@ -0,0 +1,24 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Down;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 14:13
+ * @Description
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class StirStartVo implements Vo4Down {
+ @JSONField(name = "鎼呮媽鍚仠")
+ public boolean isDo ;//true涓哄惎锛宖alse涓哄仠
+ @Override
+ public String toString(){
+ return "鎼呮媽鍚仠锛�" + (isDo?"鍚�":"鍋�") ;
+ }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java
new file mode 100644
index 0000000..f038514
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java
@@ -0,0 +1,36 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.upVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Up;
+import com.dy.common.util.DateTime;
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 10:05
+ * @Description
+ */
+@Data
+public class ManureVo implements Vo4Up {
+ @JSONField(name = "flexem_timestamp")
+ public Long devDt ;//璁惧鏃堕棿
+
+ public String devDtStr ;//璁惧鏃堕棿
+ public String getDevDtStr() {
+ if(devDt == null){
+ return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ;
+ }else{
+ return "" ;
+ }
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("姘磋偉鏁版嵁锛�") ;
+ sb.append(" 璁惧鏃堕棿锛�"+devDt) ;
+ sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ;
+ sb.append("\n") ;
+ return sb.toString() ;
+ }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java
new file mode 100644
index 0000000..a465609
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java
@@ -0,0 +1,36 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.upVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Up;
+import com.dy.common.util.DateTime;
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 10:05
+ * @Description
+ */
+@Data
+public class SoilVo implements Vo4Up {
+ @JSONField(name = "flexem_timestamp")
+ public Long devDt ;//璁惧鏃堕棿
+
+ public String devDtStr ;//璁惧鏃堕棿
+ public String getDevDtStr() {
+ if(devDt == null){
+ return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ;
+ }else{
+ return "" ;
+ }
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("澧掓儏鏁版嵁锛�") ;
+ sb.append(" 璁惧鏃堕棿锛�"+devDt) ;
+ sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ;
+ sb.append("\n") ;
+ return sb.toString() ;
+ }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java
new file mode 100644
index 0000000..cae3afc
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java
@@ -0,0 +1,36 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.upVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Up;
+import com.dy.common.util.DateTime;
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 10:05
+ * @Description
+ */
+@Data
+public class StateVo implements Vo4Up {
+ @JSONField(name = "flexem_timestamp")
+ public Long devDt ;//璁惧鏃堕棿
+
+ public String devDtStr ;//璁惧鏃堕棿
+ public String getDevDtStr() {
+ if(devDt == null){
+ return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ;
+ }else{
+ return "" ;
+ }
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("鐘舵�佹暟鎹細") ;
+ sb.append(" 璁惧鏃堕棿锛�"+devDt) ;
+ sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ;
+ sb.append("\n") ;
+ return sb.toString() ;
+ }
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
new file mode 100644
index 0000000..6ebc8be
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
@@ -0,0 +1,70 @@
+package com.dy.common.mw.protocol4Mqtt.pSdV1.upVos;
+
+import com.alibaba.fastjson2.annotation.JSONField;
+import com.dy.common.mw.protocol4Mqtt.Vo4Up;
+import com.dy.common.util.DateTime;
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 10:04
+ * @Description
+ */
+@Data
+public class WeatherVo implements Vo4Up {
+ //{"PM10":10,"PM2.5":0,"flexem_message_id":1311,"flexem_timestamp":1749522958,"浜屾哀鍖栫⒊":10,"鍏夌収寮哄害":0,"澶ф皵鍘嬪姏":20,"绌烘皵娓╁害":0,"绌烘皵婀垮害":65}
+ @JSONField(name = "flexem_message_id")
+ public Integer messageId ;//娑堟伅ID
+
+ @JSONField(name = "浜屾哀鍖栫⒊")
+ public Integer carbonDioxide ;//浜屾哀鍖栫⒊
+
+ @JSONField(name = "鍏夌収寮哄害")
+ public Integer lightIntensity ;//鍏夌収寮哄害
+
+ @JSONField(name = "澶ф皵鍘嬪姏")
+ public Integer atmosphericPressure ;//澶ф皵鍘嬪姏
+
+ @JSONField(name = "绌烘皵娓╁害")
+ public Integer airTemperature ;//绌烘皵娓╁害
+
+ @JSONField(name = "绌烘皵婀垮害")
+ public Integer airHumidity ;//绌烘皵婀垮害
+
+ @JSONField(name = "PM2.5")
+ public Integer pm25 ;//PM2.5
+
+ @JSONField(name = "PM10")
+ public Integer pm10 ;//PM10
+
+
+ @JSONField(name = "flexem_timestamp")
+ public Long devDt ;//璁惧鏃堕棿
+
+ public String devDtStr ;//璁惧鏃堕棿
+ public String getDevDtStr() {
+ if(devDt == null){
+ return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ;
+ }else{
+ return "" ;
+ }
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("姘旇薄鏁版嵁锛�") ;
+ sb.append(" 娑堟伅ID锛�"+messageId) ;
+ sb.append(" 浜屾哀鍖栫⒊锛�"+carbonDioxide) ;
+ sb.append(" 鍏夌収寮哄害锛�"+lightIntensity) ;
+ sb.append(" 澶ф皵鍘嬪姏锛�"+atmosphericPressure) ;
+ sb.append(" 绌烘皵娓╁害锛�"+airTemperature) ;
+ sb.append(" 绌烘皵婀垮害锛�"+airHumidity) ;
+ sb.append(" PM2.5锛�"+pm25) ;
+ sb.append(" PM10锛�"+pm10) ;
+ sb.append(" 璁惧鏃堕棿锛�"+devDt) ;
+ sb.append(" 璁惧鏃堕棿锛�"+ this.getDevDtStr()) ;
+ sb.append("\n") ;
+ return sb.toString() ;
+ }
+}
diff --git "a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/\350\257\264\346\230\216.txt" "b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/\350\257\264\346\230\216.txt"
index 6f29fa0..7d55cba 100644
--- "a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/\350\257\264\346\230\216.txt"
+++ "b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/\350\257\264\346\230\216.txt"
@@ -1,6 +1,6 @@
灞变笢娉板畨鍏徃鎻愪緵姘磋偉鏈恒�佸湡澹ゅ鎯呯珯銆佹皵璞$珯銆丗Box绯荤粺鍗忚
寤鸿娑堟伅涓婚瑙勫垯锛�
-瀛愮郴缁燂紙鏈烘瀯锛�/鍗忚鍚嶇О锛堝巶瀹讹級+鐗堟湰鍙�/璁惧缂栧彿/鍔熻兘缁�/鍦板潃
+瀛愮郴缁燂紙鏈烘瀯锛�/鍗忚鍚嶇О锛堝巶瀹讹級+鐗堟湰鍙�/璁惧缂栧彿/鍔熻兘缁�
渚嬪锛�
-ym/sd1/10000/control/m4 (鍏冭皨/灞变笢+鐗堟湰1/璁惧缂栧彿/璁惧鎺у埗/鍦板潃)
\ No newline at end of file
+ym/sd1/10000/weather (鍏冭皨/灞变笢+鐗堟湰1/璁惧缂栧彿/姘旇薄)
\ No newline at end of file
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java
new file mode 100644
index 0000000..1092b27
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java
@@ -0,0 +1,16 @@
+package com.dy.common.mw.protocol4Mqtt.status;
+
+import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo;
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 15:50
+ * @Description
+ */
+@Data
+public class DevOnLineSt implements MqttNotifyInfo {
+ public String id ;
+ public String protocol ;
+ public Boolean onLine ;
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java
new file mode 100644
index 0000000..0fe300a
--- /dev/null
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java
@@ -0,0 +1,18 @@
+package com.dy.common.mw.protocol4Mqtt.status;
+
+import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo;
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 15:52
+ * @Description
+ */
+@Data
+public class DevRunSt implements MqttNotifyInfo {
+ public String id ;
+ public Boolean stirRunning ;//鎼呮媽杩愯 true鏄� false鍚�
+ public Boolean injectRunning ;//娉ㄨ偉杩愯 true鏄� false鍚�
+ public Boolean irrRunning ;//鐏屾簤杩愯 true鏄� false鍚�
+ public Boolean alarm ;//鎶ヨ true鏄� false鍚�
+}
diff --git a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/softUpgrade/state/UpgradeRtu.java b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/softUpgrade/state/UpgradeRtu.java
index 7c13c3a..0424c43 100644
--- a/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/softUpgrade/state/UpgradeRtu.java
+++ b/pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/softUpgrade/state/UpgradeRtu.java
@@ -60,30 +60,19 @@
}
public static String getStateName(int state){
- switch (state) {
- case STATE_OPEN:
- return "闃�寮�";
- case STATE_OFFLINE:
- return "绂荤嚎";
- case STATE_UNSTART:
- return "鏈紑濮�";
- case STATE_RUNNING:
- return "鍗囩骇涓�";
- case STATE_SUCCESS:
- return "鍗囩骇鎴愬姛";
- case STATE_FAILONE:
- return "涓�鍖呮";
- case STATE_FAIL:
- return "澶氬寘姝�";
- case STATE_FAILOFFLINE:
- return "绂荤嚎澶辫触";
- case STATE_FAILOPEN:
- return "闃�寮�澶辫触";
- case STATE_FAILRTU:
- return "RTU澶辫触";
- default:
- return "鏈煡";
- }
+ return switch (state) {
+ case STATE_OPEN -> "闃�寮�";
+ case STATE_OFFLINE -> "绂荤嚎";
+ case STATE_UNSTART -> "鏈紑濮�";
+ case STATE_RUNNING -> "鍗囩骇涓�";
+ case STATE_SUCCESS -> "鍗囩骇鎴愬姛";
+ case STATE_FAILONE -> "涓�鍖呮";
+ case STATE_FAIL -> "澶氬寘姝�";
+ case STATE_FAILOFFLINE -> "绂荤嚎澶辫触";
+ case STATE_FAILOPEN -> "闃�寮�澶辫触";
+ case STATE_FAILRTU -> "RTU澶辫触";
+ default -> "鏈煡";
+ };
}
/**
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
index b7917f5..0cf1c24 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
@@ -111,12 +111,12 @@
}catch(Exception e){
company = "" ;
}
- System.out.println("OOOOOOOOOO OOOOOOOO OOOOOOOO") ;
- System.out.println("@@@@@@@@@@@@@@@@#O $@@@@@@@@& @@@@@@@@#") ;
- System.out.println("@@@@@@@@@@@@@@@@@@@# @@@@@@@@# $@@@@@@@@&") ;
- System.out.println("@@@@@@@@@@@@@@@@@@@@@# #@@@@@@@@@@@@@@@@O") ;
- System.out.println("@@@@@@@@@@@@@@@@@@@@@@@ &@@@@@@@@@@@@@@") ;
- System.out.println("@@@@@@$ $@@@@@@@@@& O@@@@@@@@@@@#") ;
+ System.out.println("$$$$$$$$$$$$ $$$$$$$$ $$$$$$$$") ;
+ System.out.println("@@@@@@@@@@@@@@@@#$ $@@@@@@@@& @@@@@@@@#") ;
+ System.out.println("@@@@@@@@@@@@@@@@@@@# @@@@@@@@# $@@@@@@@@&") ;
+ System.out.println("@@@@@@@@@@@@@@@@@@@@@# #@@@@@@@@@@@@@@@@$") ;
+ System.out.println("@@@@@@@@@@@@@@@@@@@@@@@ &@@@@@@@@@@@@@@") ;
+ System.out.println("@@@@@@$ $@@@@@@@@@& $@@@@@@@@@@@#") ;
System.out.println("@@@@@@$ @@@@@@@@@ @@@@@@@@@& " + this.orgTag + svName + "RtuMw 1.0.00" ) ;
if(this.HttpSvPath != null && this.HttpSvPort != null){
System.out.println("@@@@@@$ O@@@@@@@@@ &@@@@@@@@ HttpSv [ip]:" + this.HttpSvPort + this.HttpSvPath) ;
@@ -450,7 +450,7 @@
mqVo.enable = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ;
ServerProperties.mqttUnitEnable = mqVo.enable ;
if(mqVo.enable){
- mqVo.svIp = conf.getSetAttrTxt(doc, "config.mqtt", "svIp", null, true, null) ;
+ mqVo.svIp = conf.getSetAttrTxt(doc, "config.mqtt", "svIp", null, false, null) ;
if(!IPUtils.ipValid(mqVo.svIp)){
throw new Exception("config.mqtt.svIp閰嶇疆鐨処P涓嶅悎娉�") ;
}
@@ -458,13 +458,13 @@
if(mqVo.svPort < 0 || mqVo.svPort > 65535){
throw new Exception("config.mqtt.svPort閰嶇疆鐨勭鍙d笉鍚堟硶") ;
}
- mqVo.svUserName = conf.getSetAttrTxt(doc, "config.mqtt", "svUserName", null, true, null) ;
+ mqVo.svUserName = conf.getSetAttrTxt(doc, "config.mqtt", "svUserName", null, false, null) ;
if(mqVo.svUserName == null || mqVo.svUserName.trim().equals("")){
throw new Exception("config.mqtt.svUserName閰嶇疆鐨勭敤鎴峰悕涓嶅悎娉�") ;
}else{
mqVo.svUserName = mqVo.svUserName.trim() ;
}
- mqVo.svUserPassword = conf.getSetAttrTxt(doc, "config.mqtt", "svUserPassword", null, true, null) ;
+ mqVo.svUserPassword = conf.getSetAttrTxt(doc, "config.mqtt", "svUserPassword", null, false, null) ;
if(mqVo.svUserPassword == null || mqVo.svUserPassword.trim().equals("")){
throw new Exception("config.mqtt.svUserName閰嶇疆鐨勭敤鎴峰瘑鐮佷笉鍚堟硶") ;
}else{
@@ -474,28 +474,51 @@
if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){
throw new Exception("config.mqtt.poolMaxSize閰嶇疆鐨勮繛鎺ユ睜杩炴帴鏈�澶ф暟閲忎笉鍚堟硶") ;
}
- String topicAndQos = conf.getSetAttrTxt(doc, "config.mqtt", "topicAndQos", null, true, null) ;
- if(topicAndQos == null || topicAndQos.trim().equals("")){
- throw new Exception("config.mqtt.topicAndQos閰嶇疆鐨勪富棰樺強Qos涓嶅悎娉�") ;
+ String proAndDevIds = conf.getSetAttrTxt(doc, "config.mqtt", "protocolAndDeviceIds", null, false, null) ;
+ if(proAndDevIds == null || proAndDevIds.trim().equals("")){
+ throw new Exception("config.mqtt.protocolAndDeviceIds閰嶇疆涓嶅悎娉�") ;
}else{
- topicAndQos = topicAndQos.trim() ;
- topicAndQos = topicAndQos.replaceAll("锛�", ",");
- topicAndQos = topicAndQos.replaceAll("锛�", ";");
- String[] topicAndQosArr = topicAndQos.split(";") ;
+ proAndDevIds = proAndDevIds.trim() ;
+ proAndDevIds = proAndDevIds.replaceAll("锛�", ",");
+ proAndDevIds = proAndDevIds.replaceAll("锛�", ";");
+ proAndDevIds = proAndDevIds.replaceAll("\\\\", "/");
+ mqVo.protocolAndDeviceIds = proAndDevIds.split(",") ;
+ mqVo.deviceIds = new String[mqVo.protocolAndDeviceIds.length] ;
+ int index = 0 ;
+ for(String topicAndQosStr : mqVo.protocolAndDeviceIds){
+ String[] pd = topicAndQosStr.split("/") ;
+ mqVo.deviceIds[index] = pd[1].trim() ;
+ index++ ;
+ }
+ }
+
+
+ String subTopicAndQos = conf.getSetAttrTxt(doc, "config.mqtt", "subTopicAndQos", null, false, null) ;
+ if(subTopicAndQos == null || subTopicAndQos.trim().equals("")){
+ throw new Exception("config.mqtt.subTopicAndQos閰嶇疆鐨勪富棰樺強Qos涓嶅悎娉�") ;
+ }else{
+ subTopicAndQos = subTopicAndQos.trim() ;
+ subTopicAndQos = subTopicAndQos.replaceAll("锛�", ",");
+ subTopicAndQos = subTopicAndQos.replaceAll("锛�", ";");
+ String[] topicAndQosArr = subTopicAndQos.split(";") ;
mqVo.subTopics = new String[topicAndQosArr.length] ;
- mqVo.topicsQos = new int[topicAndQosArr.length] ;
+ mqVo.subTopicsQos = new int[topicAndQosArr.length] ;
int index = 0 ;
for(String topicAndQosStr : topicAndQosArr){
String[] tq = topicAndQosStr.split(",") ;
mqVo.subTopics[index] = tq[0].trim() ;
- mqVo.topicsQos[index] = Integer.parseInt(tq[1].trim()) ;
+ mqVo.subTopicsQos[index] = Integer.parseInt(tq[1].trim()) ;
index++ ;
}
}
- mqVo.publishQos = conf.getSetAttrPlusInt(doc, "config.mqtt", "publishQos", null, 0, 3, null);
- if(mqVo.publishQos < 0 || mqVo.publishQos > 3){
- throw new Exception("config.mqtt.publishQos閰嶇疆涓嶅悎娉�") ;
+ mqVo.pubTopicQos = conf.getSetAttrPlusInt(doc, "config.mqtt", "pubTopicQos", null, 0, 3, null);
+ if(mqVo.pubTopicQos < 0 || mqVo.pubTopicQos > 3){
+ throw new Exception("config.mqtt.pubTopicQos閰嶇疆涓嶅悎娉�") ;
}
+
+ Integer intNoSubThenOff = conf.getSetAttrPlusInt(doc, "config.mqtt", "noSubThenOff", null, 1, 1440, null);
+ mqVo.noSubThenOff = intNoSubThenOff * 60 * 1000L ;
+
mqVo.showStartInfo = showStartInfo ;
AdapterImp_MqttUnit mqAdapt = new AdapterImp_MqttUnit();
mqAdapt.setConfig(mqVo);
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/RtuLogDealer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/RtuLogDealer.java
index 0ffda6a..55b0b50 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/RtuLogDealer.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/RtuLogDealer.java
@@ -26,5 +26,23 @@
ResourceUnit.getInstance().rtuLog(logNode);
}
+ /**
+ * 璁板綍Rtu鏃ュ織
+ * @param devId
+ * @param content
+ */
+ public static void log4Mqtt(String devId, String content){
+ if(devId == null || devId.trim().equals("")){
+ log.error("涓ラ噸閿欒锛岃褰昅qtt璁惧鏃ュ織鏃讹紝璁惧鍦板潃鏈彁渚涳紒") ;
+ return ;
+ }
+ if(content == null || content.equals("")){
+ log.error("涓ラ噸閿欒锛岃褰昅qtt璁惧鏃ュ織鏃讹紝鏃ュ織鍐呭鏈彁渚涳紒") ;
+ return ;
+ }
+ RtuLogNode logNode = new RtuLogNode(devId, content) ;
+
+ ResourceUnit.getInstance().rtuLog(logNode);
+ }
}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
index 9e21c81..6b4d081 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
@@ -5,6 +5,8 @@
import com.dy.common.mw.protocol.Command;
import com.dy.common.mw.protocol.rtuState.RtuStatus;
import com.dy.rtuMw.server.local.localProtocol.*;
+import com.dy.rtuMw.server.mqtt.DevStatus;
+import com.dy.rtuMw.server.mqtt.DevStatusDealer;
import com.dy.rtuMw.server.mqtt.MqttUnit;
import java.util.HashMap;
@@ -23,37 +25,105 @@
* @return
*/
public Command deal(Command com) throws Exception{
+ Command rCom ;
String code = com.getCode() ;
- if(code.equals(CodeLocal.clock)){
- return this.clock(com) ;
- }else if(code.equals(CodeLocal.onAllLine)){
- return this.onAllLine(com) ;
- }else if(code.equals(CodeLocal.onPartLine)){
- return this.onPartLine(com) ;
- }else if(code.equals(CodeLocal.onLineStatistics)){
- return this.onLineStateStatistics(com) ;
- }else if(code.equals(CodeLocal.allRtuStates)){
- return this.allRtuStates(com) ;
- }else if(code.equals(CodeLocal.partRtuStates)){
- return this.someRtuStates(com) ;
- }else if(code.equals(CodeLocal.oneRtuStates)){
- return this.oneRtuStates(com) ;
- }else if(code.equals(CodeLocal.allProtocols)){
- return this.allProtocols(com) ;
- }else if(code.equals(CodeLocal.stopTcpSv)){
- return this.stopTcpSv(com) ;
- }else if(code.equals(CodeLocal.recoverTcpSv)){
- return this.recoverTcpSv(com) ;
- }else if(code.equals(CodeLocal.recoverMqttSv)){
- return this.stopMqttSv(com) ;
- }else if(code.equals(CodeLocal.mwState)){
- return this.mwInfo(com) ;
+ switch (code) {
+ case CodeLocal.clock -> {
+ rCom = this.clock(com);
+ break;
+ }
+ case CodeLocal.mwState -> {
+ rCom = this.mwInfo(com);
+ break;
+ }
+
+ ////////////////////////////////////////////
+ //
+ // 浠ヤ笅鏄浉鍏冲熀浜嶵CP杩炴帴鐨凴TU璁惧鐨勫唴閮ㄥ懡浠�
+ //
+ ////////////////////////////////////////////
+ case CodeLocal.onAllLine -> {
+ rCom = this.onAllLine(com);
+ break;
+ }
+ case CodeLocal.onPartLine -> {
+ rCom = this.onPartLine(com);
+ break;
+ }
+ case CodeLocal.onLineStatistics -> {
+ rCom = this.onLineStateStatistics(com);
+ break;
+ }
+ case CodeLocal.allRtuStates -> {
+ rCom = this.allRtuStates(com);
+ break;
+ }
+ case CodeLocal.partRtuStates -> {
+ rCom = this.someRtuStates(com);
+ break;
+ }
+ case CodeLocal.oneRtuStates -> {
+ rCom = this.oneRtuStates(com);
+ break;
+ }
+ case CodeLocal.allProtocols -> {
+ rCom = this.allProtocols(com);
+ break;
+ }
+ case CodeLocal.stopTcpSv -> {
+ rCom = this.stopTcpSv(com);
+ break;
+ }
+ case CodeLocal.recoverTcpSv -> {
+ rCom = this.recoverTcpSv(com);
+ break;
+ }
+
+
+ ////////////////////////////////////////////
+ //
+ // 浠ヤ笅鏄浉鍏冲熀浜嶮QTT杩炴帴鐨勮澶囩殑鍐呴儴鍛戒护
+ //
+ ////////////////////////////////////////////
+ case CodeLocal.onAllLineMqtt -> {
+ rCom = this.onAllLineMqtt(com);
+ break;
+ }
+ case CodeLocal.onPartLineMqtt -> {
+ rCom = this.onPartLineMqtt(com);
+ break;
+ }
+ case CodeLocal.onLineStatisticsMqtt -> {
+ rCom = this.onLineStateStatisticsMqtt(com);
+ break;
+ }
+ case CodeLocal.allRtuStatesMqtt -> {
+ rCom = this.allRtuStatesMqtt(com);
+ break;
+ }
+ case CodeLocal.partRtuStatesMqtt -> {
+ rCom = this.someRtuStatesMqtt(com);
+ break;
+ }
+ case CodeLocal.oneRtuStatesMqtt -> {
+ rCom = this.oneRtuStatesMqtt(com);
+ break;
+ }
+ case CodeLocal.stopMqttSv -> {
+ rCom = this.stopMqttSv(com);
+ break;
+ }
+ default -> {
+ rCom = ReturnCommand.errored("鍑洪敊锛屾敹鍒板唴閮ㄥ懡浠ょ殑鍔熻兘鐮佷笉鑳借瘑鍒紒", com.getId(), com.getCode());
+ break;
+ }
}
- return ReturnCommand.errored("鍑洪敊锛屾敹鍒板唴閮ㄥ懡浠ょ殑鍔熻兘鐮佷笉鑳借瘑鍒紒", com.getId(), com.getCode()) ;
+ return rCom ;
}
/**
* 鏌ヨ閫氫俊涓棿浠舵椂閽�
+ * @param command
* @throws Exception
*/
private Command clock(Command command) throws Exception{
@@ -112,7 +182,7 @@
Map<String, RtuStatus> map = new RtuStatusDeal().dealSome(rtuAddrGrp) ;
return ReturnCommand.successed("鏌ヨ閮ㄥ垎RTU鐘舵�佺粨鏋�", command.getId(), command.getCode(), map) ;
}else{
- return ReturnCommand.errored("鍑洪敊锛屽懡浠ゅ弬鏁板簲璇ユ槸鎵�鏌ヨRTU鐨勫湴鍧�涓�", command.getId(), command.getCode()) ;
+ return ReturnCommand.errored("鍑洪敊锛屽懡浠ゅ弬鏁板簲璇ユ湁鎵�鏌ヨRTU鐨勫湴鍧�涓�", command.getId(), command.getCode()) ;
}
}
@@ -126,7 +196,7 @@
RtuStatus rtuStatus = new RtuStatusDeal().dealOne(rtuAddr) ;
return ReturnCommand.successed("鏌ヨ涓�涓猂TU鐘舵�佺粨鏋�", command.getId(), command.getCode(), rtuStatus) ;
}else{
- return ReturnCommand.errored("鍑洪敊锛屽懡浠ゅ弬鏁板簲璇ユ槸鎵�鏌ヨRTU鐨勫湴鍧�", command.getId(), command.getCode()) ;
+ return ReturnCommand.errored("鍑洪敊锛屽懡浠ゅ弬鏁板簲璇ユ湁鎵�鏌ヨRTU鐨勫湴鍧�", command.getId(), command.getCode()) ;
}
}
@@ -160,8 +230,77 @@
return ReturnCommand.successed("宸茬粡鍚姩鎭㈠TCP鏈嶅姟", command.getId(), command.getCode(), null) ;
}
+
/**
- * 鍋滄TCP鏈嶅姟锛屼笉鍐嶆帴鍏ユ柊鐨凾CP杩炴帴锛屽凡缁廡CP杩炴帴鐨勫叏閮ㄦ柇杩炴帴
+ * 鏌ヨ鎵�鏈塎QTT璁惧鍦ㄧ嚎鎯呭喌
+ * @throws Exception
+ */
+ private Command onAllLineMqtt(Command command) throws Exception{
+ HashMap<String, Boolean> map = DevStatusDealer.allOnLine() ;
+ return ReturnCommand.successed("鏌ヨ鎵�鏈塎qtt璁惧鍦ㄧ嚎鎯呭喌缁撴灉", command.getId(), command.getCode(), map) ;
+ }
+
+ /**
+ * 鏌ヨ閮ㄥ垎MQTT璁惧鍦ㄧ嚎鎯呭喌
+ * @throws Exception
+ */
+ private Command onPartLineMqtt(Command command) throws Exception{
+ if(command.param != null && command.param instanceof String && !command.param.equals("")){
+ String[] devIds = ((String)command.param).split(",");
+ HashMap<String, Boolean> map = DevStatusDealer.partOnLine(devIds) ;
+ return ReturnCommand.successed("鏌ヨ閮ㄥ垎Mqtt璁惧鍦ㄧ嚎鎯呭喌缁撴灉", command.getId(), command.getCode(), map) ;
+ }else{
+ return ReturnCommand.errored("鍑洪敊锛屽懡浠ゅ弬鏁板簲璇ユ湁鎵�鏌ヨMqtt璁惧鐨勫湴鍧�涓�", command.getId(), command.getCode()) ;
+ }
+ }
+
+ /**
+ * 缁熻MQTT璁惧鍦ㄧ嚎涓庝笉鍦ㄧ嚎鎯呭喌
+ * @throws Exception
+ */
+ private Command onLineStateStatisticsMqtt(Command command) throws Exception{
+ RtuOnLineStateStatisticsVo vo = DevStatusDealer.statisticsOnLine() ;
+ return ReturnCommand.successed("鏌ヨ鎵�鏈塎qtt璁惧鍦ㄧ嚎鎯呭喌缁撴灉", command.getId(), command.getCode(), vo) ;
+ }
+
+ /**
+ * 鏌ヨ鎵�鏈塎QTT璁惧鐘舵��
+ * @throws Exception
+ */
+ private Command allRtuStatesMqtt(Command command) throws Exception{
+ Map<String, DevStatus> map = DevStatusDealer.allStatus() ;
+ return ReturnCommand.successed("鏌ヨ鎵�鏈塎qtt璁惧鐘舵�佺粨鏋�", command.getId(), command.getCode(), map) ;
+ }
+
+ /**
+ * 鏌ヨ閮ㄥ垎MQTT璁惧鐘舵��
+ * @throws Exception
+ */
+ private Command someRtuStatesMqtt(Command command) throws Exception{
+ if(command.param != null && command.param instanceof String && !command.param.equals("")){
+ String[] devIds = ((String)command.param).split(",");
+ Map<String, DevStatus> map = DevStatusDealer.someStatus(devIds) ;
+ return ReturnCommand.successed("鏌ヨ閮ㄥ垎Mqtt璁惧鐘舵�佺粨鏋�", command.getId(), command.getCode(), map) ;
+ }else{
+ return ReturnCommand.errored("鍑洪敊锛屽懡浠ゅ弬鏁板簲璇ユ湁鎵�鏌ヨMqtt璁惧鐨勫湴鍧�涓�", command.getId(), command.getCode()) ;
+ }
+ }
+
+ /**
+ * 鏌ヨ閮ㄥ垎MQTT璁惧鐘舵��
+ * @throws Exception
+ */
+ private Command oneRtuStatesMqtt(Command command) throws Exception{
+ if(command.param != null && command.param instanceof String && !command.param.equals("")){
+ String devId = (String)command.param;
+ DevStatus devStatus = DevStatusDealer.oneStatus(devId) ;
+ return ReturnCommand.successed("鏌ヨ涓�涓狹qtt璁惧鐘舵�佺粨鏋�", command.getId(), command.getCode(), devStatus) ;
+ }else{
+ return ReturnCommand.errored("鍑洪敊锛屽懡浠ゅ弬鏁板簲璇ユ湁鎵�鏌ヨMqtt璁惧鐨勫湴鍧�", command.getId(), command.getCode()) ;
+ }
+ }
+ /**
+ * 鍋滄MQTT鏈嶅姟
* @throws Exception
*/
private Command stopMqttSv(Command command) throws Exception{
@@ -174,7 +313,7 @@
/**
- * 鎭㈠TCP鏈嶅姟锛屾帴鍏ユ柊鐨凾CP杩炴帴
+ * 鎭㈠MQTT鏈嶅姟
* @throws Exception
*/
private Command recoverMqttSv(Command command) throws Exception{
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
index 9edb9fb..e1a0417 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
@@ -1,12 +1,17 @@
package com.dy.rtuMw.server.local.localProtocol;
public class CodeLocal {
-
public static final String clock = "LCD0000" ;//鏌ヨ鐩戞帶涓棿浠舵椂閽�
+ public static final String mwState = "LCD0200" ;//寰楀埌閫氫俊涓棿浠惰繍琛屼俊鎭�
+ ////////////////////////////////////////////
+ //
+ // 浠ヤ笅鏄浉鍏冲熀浜嶵CP杩炴帴鐨凴TU璁惧鐨勫唴閮ㄥ懡浠�
+ //
+ ////////////////////////////////////////////
public static final String onAllLine = "LCD0001" ;//鏌ヨ鎵�鏈塕TU鍦ㄧ嚎鎯呭喌
- public static final String onPartLine = "LCD0002" ;//鏌ヨ鎵�鏈塕TU鍦ㄧ嚎鎯呭喌
+ public static final String onPartLine = "LCD0002" ;//鏌ヨ閮ㄥ垎RTU鍦ㄧ嚎鎯呭喌
public static final String onLineStatistics = "LCD0003" ;//鏌ヨ鎵�鏈塕TU鐘舵�佺粺璁℃儏鍐�
@@ -22,10 +27,26 @@
public static final String recoverTcpSv = "LCD0112" ;//閲嶅惎TCP鏈嶅姟锛屾帴鍏ユ柊鐨凾CP杩炴帴
- public static final String stopMqttSv = "LCD0114" ;//鍋滄Mqtt鏈嶅姟
- public static final String recoverMqttSv = "LCD0116" ;//閲嶅惎Mqtt鏈嶅姟
- public static final String mwState = "LCD0200" ;//寰楀埌閫氫俊涓棿浠惰繍琛屼俊鎭�
+ ////////////////////////////////////////////
+ //
+ // 浠ヤ笅鏄浉鍏冲熀浜嶮QTT杩炴帴鐨勮澶囩殑鍐呴儴鍛戒护
+ //
+ ////////////////////////////////////////////
+ public static final String onAllLineMqtt = "LMCD0001" ;//鏌ヨ鎵�鏈塎QTT璁惧鍦ㄧ嚎鎯呭喌
+
+ public static final String onPartLineMqtt = "LMCD0002" ;//鏌ヨ閮ㄥ垎MQTT璁惧鍦ㄧ嚎鎯呭喌
+
+ public static final String onLineStatisticsMqtt = "LMCD0003" ;//鏌ヨ鎵�鏈塎QTT璁惧鐘舵�佺粺璁℃儏鍐�
+
+ public static final String allRtuStatesMqtt = "LMCD0010" ;//鏌ヨ鎵�鏈塎QTT璁惧鐘舵��
+
+ public static final String partRtuStatesMqtt = "LMCD0011" ;//鏌ヨ閮ㄥ垎MQTT璁惧鐘舵��
+
+ public static final String oneRtuStatesMqtt = "LMCD0012" ;//鏌ヨ涓�涓狹QTT璁惧鐘舵��
+
+ public static final String stopMqttSv = "LMCD0110" ;//鍋滄Mqtt鏈嶅姟
+
}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatus.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatus.java
new file mode 100644
index 0000000..5f3976e
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatus.java
@@ -0,0 +1,23 @@
+package com.dy.rtuMw.server.mqtt;
+
+import lombok.Data;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 14:54
+ * @Description
+ */
+@Data
+public class DevStatus {
+ public String id ;//璁惧ID
+ public String protocol;//鍗忚
+ public Boolean onLine ;//鏄惁鍦ㄧ嚎 true鍦ㄧ嚎 false绂荤嚎
+ public Boolean stirRunning ;//鎼呮媽杩愯 true鏄� false鍚�
+ public Boolean injectRunning ;//娉ㄨ偉杩愯 true鏄� false鍚�
+ public Boolean irrRunning ;//鐏屾簤杩愯 true鏄� false鍚�
+ public Boolean alarm ;//鎶ヨ true鏄� false鍚�
+
+ public Long lastDownComTime ;//涓婃涓嬪彂鍛戒护鏃跺埢(姣鏃跺埢 System.currentTimeMillis())
+ public Long lastUpDataTime ;//涓婃鏀跺埌涓婅鏁版嵁鏃跺埢(姣鏃跺埢 System.currentTimeMillis())
+
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
new file mode 100644
index 0000000..c164c0a
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
@@ -0,0 +1,186 @@
+package com.dy.rtuMw.server.mqtt;
+
+import com.dy.common.mw.protocol4Mqtt.status.DevRunSt;
+import com.dy.rtuMw.server.forTcp.RtuLogDealer;
+import com.dy.rtuMw.server.local.localProtocol.RtuOnLineStateStatisticsVo;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/6/10 15:00
+ * @Description
+ */
+public class DevStatusDealer {
+
+ private static Map<String, DevStatus> map = new HashMap<String, DevStatus>() ;
+
+ public static HashMap<String, Boolean> allOnLine(){
+ synchronized (map){
+ HashMap<String, Boolean> rsMap = new HashMap<>();
+ Iterator<Map.Entry<String, DevStatus>> it = map.entrySet().iterator() ;
+ Map.Entry<String, DevStatus> entry = null ;
+ while(it.hasNext()){
+ entry = it.next() ;
+ rsMap.put(entry.getKey(), entry.getValue().onLine) ;
+ }
+ return rsMap ;
+ }
+ }
+
+ public static HashMap<String, Boolean> partOnLine(String[] devIds){
+ synchronized (map){
+ HashMap<String, Boolean> rsMap = new HashMap<String, Boolean>();
+ for(String devid : devIds){
+ DevStatus st = map.get(devid) ;
+ if(st != null){
+ rsMap.put(devid, st.onLine) ;
+ }
+ }
+ return rsMap ;
+ }
+ }
+ /**
+ * 缁熻鍦ㄧ嚎涓庝笉鍦ㄧ嚎鎯呭喌
+ */
+ public static RtuOnLineStateStatisticsVo statisticsOnLine(){
+ RtuOnLineStateStatisticsVo vo = new RtuOnLineStateStatisticsVo() ;
+ vo.onLineNum = 0 ;
+ vo.offLineNum = 0 ;
+ synchronized (map){
+ Iterator<Map.Entry<String, DevStatus>> it = map.entrySet().iterator() ;
+ Map.Entry<String, DevStatus> entry = null ;
+ while(it.hasNext()){
+ entry = it.next() ;
+ if(((DevStatus)entry).onLine != null && ((DevStatus)entry).onLine.booleanValue()){
+ vo.onLineNum++ ;
+ }else{
+ vo.offLineNum++ ;
+ }
+ }
+ }
+ return vo ;
+ }
+
+ /**
+ * 寰楀埌鍏ㄩ儴鐘舵��
+ * @return
+ */
+ public static Map<String, DevStatus> allStatus(){
+ return map ;
+ }
+ /**
+ * 寰楀埌閮ㄥ垎鐘舵��
+ * @return
+ */
+ public static Map<String, DevStatus> someStatus(String[] devIdArrGrp){
+ synchronized (map){
+ Map<String, DevStatus> rsMap = new HashMap<>();
+ for(String devId : devIdArrGrp){
+ DevStatus status = map.get(devId) ;
+ if(status != null){
+ rsMap.put(devId, status) ;
+ }
+ }
+ return rsMap ;
+ }
+ }
+ /**
+ * 寰楀埌涓�涓猂TU鐨勭姸鎬�
+ * @return
+ */
+ public static DevStatus oneStatus(String devId){
+ return map.get(devId) ;
+ }
+
+ public static void updateOnLineState() {
+ if (MqttUnit.confVo != null
+ && MqttUnit.confVo.noSubThenOff != null
+ && MqttUnit.confVo.noSubThenOff.longValue() > 0) {
+ Long now = System.currentTimeMillis() ;
+ synchronized (map){
+ Set<Map.Entry<String, DevStatus>> entrySet = map.entrySet() ;
+ Iterator<Map.Entry<String, DevStatus>> it = entrySet.iterator() ;
+ Map.Entry<String, DevStatus> entry ;
+ DevStatus st;
+ while(it.hasNext()){
+ entry = it.next() ;
+ st = entry.getValue();
+ if(st.onLine != null && st.onLine.booleanValue() && st.lastUpDataTime != null){
+ if(now - st.lastUpDataTime > MqttUnit.confVo.noSubThenOff.longValue()){
+ st.onLine = false ;
+ RtuLogDealer.log4Mqtt(entry.getKey(), "鍥犺緝闀挎椂闂存湭鏀朵笂琛屾暟鎹紝璁や负璁惧绂荤嚎");
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * 鍙戦�佹秷鎭悗
+ * @param devId
+ */
+ public static void afterSendPubMessage(String devId){
+ DevStatus st = map.get(devId);
+ if(st != null){
+ st.lastDownComTime = System.currentTimeMillis() ;
+ }
+ }
+
+ /**
+ * 鎺ユ敹娑堟伅鍚�
+ * @param devId
+ */
+ public static void afterReceiveSubMessage(String devId){
+ DevStatus st = map.get(devId);
+ if(st != null){
+ st.lastUpDataTime = System.currentTimeMillis() ;
+ }
+ }
+
+ public static void onLine(String devId, String protocol){
+ DevStatus vo = map.get(devId) ;
+ if(vo == null) {
+ vo = new DevStatus();
+ vo.id = devId ;
+ vo.protocol = protocol ;
+ vo.onLine = true ;
+ map.put(devId, vo);
+ }else {
+ vo.onLine = true ;
+ }
+ }
+
+ public static void offLine(String devId){
+ DevStatus vo = map.get(devId) ;
+ if(vo == null) {
+ vo = new DevStatus();
+ vo.onLine = false ;
+ map.put(devId, vo);
+ }else {
+ vo.onLine = false ;
+ }
+ }
+
+ public static void setStatus(String devId, DevRunSt st){
+ DevStatus vo = map.get(devId) ;
+ if(vo != null) {
+ if(st.stirRunning != null){
+ vo.stirRunning = st.stirRunning ;
+ }
+ if(st.injectRunning != null){
+ vo.injectRunning = st.injectRunning ;
+ }
+ if(st.irrRunning != null){
+ vo.irrRunning = st.irrRunning ;
+ }
+ if(st.alarm != null){
+ vo.alarm = st.alarm ;
+ }
+ }
+ }
+}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
index 3c775b5..ac3e211 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
@@ -1,6 +1,11 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.mw.channel.mqtt.MqttClientPool;
+import com.dy.common.mw.protocol4Mqtt.MqttNotify;
+import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo;
+import com.dy.common.mw.protocol4Mqtt.status.DevOnLineSt;
+import com.dy.common.mw.protocol4Mqtt.status.DevRunSt;
+import com.dy.rtuMw.server.ServerProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttClient;
@@ -33,13 +38,17 @@
this.configVo = configVo;
}
+ /**
+ * 鍒涘缓杩炴帴姹� + 璁㈤槄涓婚
+ * @throws Exception
+ */
public void start()throws Exception{
String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort;
this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize);
if(this.pool.isClose()){
throw new Exception("Mqtt杩炴帴姹犲垵濮嬪寲澶辫触");
}
- MqttClient clientSub = null ;
+ MqttClient clientSub ;
try {
clientSub = pool.popClient();//鏂板垱寤轰竴涓狢lient鏃讹紝姝lient瀹為檯鍘昏繛鎺QTT鏈嶅姟鍣紝濡傛灉杩炴帴涓嶄笂锛屽氨浼氭姏鍑哄紓甯�
}catch (Exception e){
@@ -48,8 +57,35 @@
if(clientSub == null || !clientSub.isConnected()){
throw new Exception("Mqtt杩炴帴姹犺幏寰楄闃呰繛鎺ヤ笉鍙敤");
}
+ // 璁㈤槄涓婚
for(int i = 0; i < this.configVo.subTopics.length; i++){
- clientSub.subscribe(this.configVo.subTopics[i], this.configVo.topicsQos[i], new MqttMessageListener());
+ for(int j = 0 ; j < this.configVo.protocolAndDeviceIds.length; j++){
+ clientSub.subscribe(ServerProperties.orgTag + "/"
+ + this.configVo.protocolAndDeviceIds[j] + "/"
+ + this.configVo.subTopics[i],
+ this.configVo.subTopicsQos[i],
+ //姣忎竴涓闃呬富棰橀兘鏈変竴涓狹qttMessageListener瀹炰緥
+ new MqttMessageListener(new MqttNotify(){
+ @Override
+ public void notify(String devId, MqttNotifyInfo... infos) {
+ if(devId != null && infos != null && infos.length > 0){
+ for(MqttNotifyInfo info : infos){
+ if(info instanceof DevOnLineSt){
+ DevOnLineSt onLineSt = (DevOnLineSt)info;
+ if(onLineSt.onLine != null && onLineSt.onLine.booleanValue()){
+ DevStatusDealer.onLine(devId, ((DevOnLineSt)info).protocol);
+ }else{
+ DevStatusDealer.offLine(devId);
+ }
+ } else if(info instanceof DevRunSt){
+ DevStatusDealer.setStatus(devId, (DevRunSt)info);
+ }
+ }
+ }
+ }
+ })
+ );
+ }
}
}
@@ -69,12 +105,12 @@
}
public void publishMsg(MqttClient client, String topic, byte[] msg) throws Exception{
- client.publish(topic, msg, this.configVo.publishQos, false);
+ client.publish(topic, msg, this.configVo.pubTopicQos, false);
}
public void publishMsg(MqttClient client, String topic, String msg) throws Exception{
byte[] bs = msg.getBytes("UTF-8") ;
- client.publish(topic, bs, this.configVo.publishQos, false);
+ client.publish(topic, bs, this.configVo.pubTopicQos, false);
}
public boolean poolIsClose(){
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
index 19bd3eb..7d8c6ea 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
@@ -1,9 +1,11 @@
package com.dy.rtuMw.server.mqtt;
-import com.dy.common.mw.protocol4Mqtt.MqttMsgParser;
-import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
-import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
+import com.dy.common.mw.protocol4Mqtt.*;
+import com.dy.common.mw.protocol4Mqtt.MqttCallback;
+import com.dy.common.mw.protocol4Mqtt.MqttTopic;
import com.dy.common.util.Callback;
+import com.dy.rtuMw.server.forTcp.RtuLogDealer;
+import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.* ;
/**
@@ -11,11 +13,30 @@
* @Date: 2025/6/4 15:52
* @Description
*/
+@Slf4j
public class MqttMessageListener implements IMqttMessageListener{
+ private MqttNotify notify ;
+ public MqttMessageListener(MqttNotify notify){
+ this.notify = notify ;
+ }
+
@Override
public void messageArrived(String topic, MqttMessage msg) throws Exception {
- MqttMsgParser parser = new MqttMsgParser() ;
- MqttSubMsg subMsg = parser.parseSubMsg(topic, msg) ;
+ MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic) ;
+ MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback(){
+ @Override
+ public void callback(MqttSubMsg subMsg) {
+ DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol);
+ DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId);
+ RtuLogDealer.log4Mqtt(subMsg.deviceId, "璁㈤槄娑堟伅 涓婚锛�" + subMsg.topic + " 娑堟伅锛�" + subMsg.msg);
+ }
+ @Override
+ public void notify(String devId, MqttNotifyInfo... infos) {
+ if(notify != null){
+ notify.notify(devId, infos) ;
+ }
+ }
+ }) ;
this.nextDeal(subMsg);
}
private void nextDeal(MqttSubMsg subMsg)throws Exception {
@@ -25,17 +46,19 @@
MqttSubMsg subMs = (MqttSubMsg) obj ;
MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ;
if(pubMs != null){
+ //鍖归厤鍒颁笅琛屾秷鎭紙鍛戒护锛�
subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ;
subMs.commandId = pubMs.commandId ;
try {
MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs));
} catch (Exception e) {
- e.printStackTrace();
+ log.error("缂撳瓨鍙戝竷娑堟伅锛堝懡浠わ級缁撴灉鍙戠敓寮傚父", e);
}
}
try{
MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg));
}catch (Exception e){
+ log.error("缂撳瓨璁㈤槄娑堟伅鏁版嵁鍙戠敓寮傚父", e);
}
}
@Override
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
index e345d59..7b827d5 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
@@ -3,6 +3,7 @@
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
import com.dy.common.queue.NodeObj;
import com.dy.rtuMw.server.ServerProperties;
+import com.dy.rtuMw.server.forTcp.RtuLogDealer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttClient;
@@ -65,6 +66,8 @@
if(mqttClient != null && mqttClient.isConnected()){
try {
mqttManager.publishMsg(mqttClient, this.result.topic, this.result.msg);
+ DevStatusDealer.afterSendPubMessage(this.result.deviceId);
+ RtuLogDealer.log4Mqtt(this.result.deviceId, "鍙戝竷娑堟伅 涓婚锛�" + this.result.topic + " 娑堟伅锛�" + this.result.msg);
log.info("鍙戝竷MQTT娑堟伅锛堜富棰�=" + this.result.topic + "锛�" + this.result.msg);
}catch (Exception e){
log.error("MQTT鍙戝竷娑堟伅澶辫触锛堜富棰�=" + this.result.topic + "锛�" , e);
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
index c767276..f96f189 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
@@ -13,9 +13,12 @@
public String svUserName ;//
public String svUserPassword ;//
public Integer poolMaxSize ;//
+ public String[] protocolAndDeviceIds ;//璁惧鍗忚涓嶪D锛團Box锛塱d
+ public String[] deviceIds ;//璁惧锛團Box锛塱d
public String[] subTopics ;//璁㈤槄鐨勪富棰�
- public int[] topicsQos ;////璁㈤槄涓婚鐨凲os
- public int publishQos ;////鍙戝竷娑堟伅鐨凲os
+ public int[] subTopicsQos;//璁㈤槄涓婚鐨凲os
+ public int pubTopicQos;//鍙戝竷娑堟伅鐨凲os
+ public Long noSubThenOff; //MQtt璁惧鍦ㄤ竴瀹氭椂闂村悗鏈彂甯冩秷鎭紝璁や负璁惧绂荤嚎
public MqttUnitConfigVo(){
this.enable = false ;
@@ -24,6 +27,7 @@
this.svUserName = "dyyjy" ;
this.svUserPassword = "Dyyjy2025,;.abc!@#" ;
this.poolMaxSize = 10 ;
- this.publishQos = 1 ;
+ this.pubTopicQos = 1 ;
+ this.noSubThenOff = 10 * 60 * 10000L ;
}
}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java
index 80501d1..debb4ab 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java
@@ -2,6 +2,7 @@
import com.dy.common.mw.core.CoreTask;
import com.dy.common.queue.Node;
+import com.dy.rtuMw.server.mqtt.DevStatusDealer;
import com.dy.rtuMw.server.mqtt.MqttSubMsgCache;
import com.dy.rtuMw.server.mqtt.MqttSubMsgNode;
import org.apache.logging.log4j.LogManager;
@@ -21,16 +22,26 @@
@Override
public Integer execute() {
try{
+ dealOneline() ;
+ }catch(Exception e){
+ log.error("鏇存柊RTU浼氳瘽涓婃姤鏁版嵁鏃跺埢鏃跺彂鐢熼泦鍚堟搷浣滃紓甯革紝姝ゅ紓甯稿苟涓嶅奖鍝嶇郴缁熸甯歌繍琛�", e);
+ }
+ try{
dealMqMsg() ;
}catch(Exception e){
log.error(e);
}
return MqttSubMsgCache.size()>0?0:1 ;
}
+
+ private void dealOneline(){
+ DevStatusDealer.updateOnLineState();
+ }
+
/**
* 澶勭悊MQTT璁㈤槄鐨勬秷鎭�
*/
- public void dealMqMsg() {
+ private void dealMqMsg() {
Node first = MqttSubMsgCache.getFirstQueueNode() ;
if(first != null){
Node last = MqttSubMsgCache.getLastQueueNode() ;
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties
index 08e4d72..b692e83 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties
@@ -10,7 +10,7 @@
# 鐢樺窞锛� gz
# 鍑夊窞锛� lz
# 閲戝窛锛� jc
-base.orgTag=ym
+base.orgTag=mq
# 233鏈嶅姟鍣細
# 鍏冭皨锛� 60000
@@ -30,19 +30,23 @@
base.upData.min.interval=6
# MQTT鏈嶅姟閰嶇疆
+# mqtt.enable 鏄惁鍚姩
+# mqtt.protocolAndDeviceIds 鍦ㄥ瓙绯荤粺锛坥rgTag锛変腑鎺ュ叆鐨勮澶�(FBox)鎵�鐢ㄥ崗璁強璁惧id闆嗗悎,澶氫釜鐢ㄩ�楀彿闅斿紑锛屽崗璁笌ID鐢ㄦ鏂滄潬闅斿紑锛屼緥濡傦細sd1/338220031439,sd1/338220031440
+# mqtt.subTopicAndQos 璁㈤槄涓婚涓嶲os锛屼富棰樺悕涓庡叾Qos鐢ㄩ�楀彿闅斿紑锛屽涓富棰樺強Qos鐢ㄥ垎鍙烽殧寮�锛屼緥濡傦細topic1,1;topic2,1;topic3,1
# 233鏈嶅姟鍣細
-# 鍏冭皨锛� mqtt.enable=false
-# 娌欑洏锛� mqtt.enable=false
-# 娴嬭瘯锛� mqtt.enable=false
-# 姊呮睙锛� mqtt.enable=false
+# 鍏冭皨锛� mqtt.enable=false mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
+# 娌欑洏锛� mqtt.enable=false mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
+# 娴嬭瘯锛� mqtt.enable=false mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
+# 姊呮睙锛� mqtt.enable=false mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
# 121鏈嶅姟鍣細
-# 姘戝嫟锛� mqtt.enable=true
-# 寤跺簡锛� mqtt.enable=false
-# 榛戦緳姹燂細 mqtt.enable=false
-# 鐢樺窞锛� mqtt.enable=false
-# 鍑夊窞锛� mqtt.enable=false
-# 閲戝窛锛� mqtt.enable=true
-# mq/sd1/338220031439/weather
+# 姘戝嫟锛� mqtt.enable=true mqtt.protocolAndDeviceIds=? mqtt.topicAndQos=weather,1;soil,1;manure,1;state,1
+# 寤跺簡锛� mqtt.enable=false mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
+# 榛戦緳姹燂細 mqtt.enable=false mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
+# 鐢樺窞锛� mqtt.enable=false mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
+# 鍑夊窞锛� mqtt.enable=false mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
+# 閲戝窛锛� mqtt.enable=true mqtt.protocolAndDeviceIds=? mqtt.topicAndQos=weather,1;soil,1;manure,1;state,1
mqtt.enable=true
-mqtt.topicAndQos=ym/sd1/10000/control/m1,1;ym/sd1/10000/control/m2,1;ym/sd1/control/m4,1;ym/sd1/10000/control/m80,1
-
+mqtt.protocolAndDeviceIds=sd1/338220031439,sd1/338220031440
+mqtt.subTopicAndQos=weather,1;soil,1;manure,1;state,1
+#MQtt璁惧鍦ㄤ竴瀹氭椂闂达紙鍒嗛挓锛夊悗鏈彂甯冩秷鎭紝璁や负璁惧绂荤嚎
+mqtt.noSubThenOff=10
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
index 5f0b6b9..3d3e466 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
@@ -166,11 +166,18 @@
<!--
- topicAndQos: 涓婚涓嶲os锛屼富棰樺悕涓庡叾Qos鐢ㄩ�楀彿闅斿紑锛屽涓富棰樺強Qos鐢ㄥ垎鍙烽殧寮�锛屼緥濡傦細ym/topic1,1;ym/topic2,1;ym/topic3,1锛屽鏋滄湁澶氫釜OrgTag锛屼富棰樺墠缂�鐢ㄥ叾OrgTag
- publishQos: 鍙戝竷娑堟伅鐨凲os锛屽彇鍊艰寖鍥达細
+ enable 鏄惁鍚姩
+ svIp MQTT鏈嶅姟鍣↖P
+ svUserName MQTT鏈嶅姟鍣ㄧ敤鎴峰悕
+ svUserPassword MQTT鏈嶅姟鍣ㄧ敤鎴峰瘑鐮�
+ poolMaxSize 杩炴帴姹犳渶澶ц繛鎺ユ暟
+ protocolAndDeviceIds 鍦ㄥ瓙绯荤粺锛坥rgTag锛変腑鎺ュ叆鐨勮澶�(FBox)鎵�鐢ㄥ崗璁強璁惧id闆嗗悎,澶氫釜鐢ㄩ�楀彿闅斿紑锛屽崗璁笌ID鐢ㄦ鏂滄潬闅斿紑锛屼緥濡傦細sd1/338220031439,sd1/338220031440
+ subTopicAndQos: 璁㈤槄涓婚涓嶲os锛屼富棰樺悕涓庡叾Qos鐢ㄩ�楀彿闅斿紑锛屽涓富棰樺強Qos鐢ㄥ垎鍙烽殧寮�锛屼緥濡傦細ym/topic1,1;ym/topic2,1;ym/topic3,1锛屽鏋滄湁澶氫釜OrgTag锛屼富棰樺墠缂�鐢ㄥ叾OrgTag
+ pubTopicQos: 鍙戝竷涓婚鐨凲os锛屽彇鍊艰寖鍥达細
0 鑷冲涓�娆★紙At most once锛� 娑堟伅鍙戦�佸悗涓嶄繚璇佸埌杈撅紝鍙兘涓㈠け鎴栭噸澶嶏紝寮�閿�鏈�灏忥紝鍙潬鎬ф渶浣庛��
1 鑷冲皯涓�娆★紙At least once锛� 娑堟伅鑷冲皯浼氬埌杈句竴娆★紝鍙兘閲嶅锛屼絾涓嶄細涓㈠け锛屽彲闈犳�т腑绛夛紝閫傜敤浜庡鏁板満鏅��
2 鎭板ソ涓�娆★紙Exactly once锛� 娑堟伅浠呬細鍒拌揪涓�娆★紝涓嶉噸澶嶄笖涓嶄涪澶憋紝鍙潬鎬ф渶楂橈紝浣嗗紑閿�鏈�澶э紝瀹炵幇鏈�澶嶆潅銆�
+ noSubThenOff: MQtt璁惧鍦ㄤ竴瀹氭椂闂达紙鍒嗛挓锛夊悗鏈彂甯冩秷鎭紝璁や负璁惧绂荤嚎
-->
<mqtt enable="${mqtt.enable}"
svIp="121.199.41.121"
@@ -178,8 +185,10 @@
svUserName="dyyjy"
svUserPassword="Dyyjy2025,;.abc!@#"
poolMaxSize="10"
- topicAndQos="${mqtt.topicAndQos}"
- publishQos="1"
+ protocolAndDeviceIds="${mqtt.protocolAndDeviceIds}"
+ subTopicAndQos="${mqtt.subTopicAndQos}"
+ pubTopicQos="1"
+ noSubThenOff="${mqtt.noSubThenOff}"
/>
</config>
\ No newline at end of file
--
Gitblit v1.8.0