From 6efa476dad8e425ff71c37cd57437c99928cd405 Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期五, 01 八月 2025 17:42:22 +0800
Subject: [PATCH] 中间件,对于水肥机数据,每条上报数据都进行最新上报数据处理,以备前端界面及时显示水肥机状态。
---
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java | 4 +
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml | 8 +++-
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml | 4 +
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java | 4 +
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4ManureSdV1.java | 21 +++++++---
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4WeatherSdV1.java | 2
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkDealManureSdV1.java | 11 +++--
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkDealManureLastSdV1.java | 58 +++++++++++++++++++++++++++++
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4SoilSdV1.java | 2
9 files changed, 95 insertions(+), 19 deletions(-)
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
index c4f0771..a09951e 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
@@ -478,7 +478,9 @@
mqVo.comCacheTimeout = conf.getSetAttrPlusInt(doc, "config.mqtt", "comCacheTimeout", null, 1, 3600, null) * 1000L ;
- ServerProperties.mqttAcceptDataMinInterval = conf.getSetAttrPlusInt(doc, "config.mqtt", "acceptDataMinInterval", null, 1, 720, null) * 60 * 1000L ;
+ ServerProperties.acceptManureDataMinInterval = conf.getSetAttrPlusInt(doc, "config.mqtt", "acceptManureDataMinInterval", null, 1, 720, null) * 60 * 1000L ;
+ ServerProperties.acceptSoilDataMinInterval = conf.getSetAttrPlusInt(doc, "config.mqtt", "acceptSoilDataMinInterval", null, 1, 720, null) * 60 * 1000L ;
+ ServerProperties.acceptWeatherDataMinInterval = conf.getSetAttrPlusInt(doc, "config.mqtt", "acceptWeatherDataMinInterval", null, 1, 720, null) * 60 * 1000L ;
mqVo.useMemoryPersistence = conf.getSetAttrBoolean(doc, "config.mqtt", "useMemoryPersistence", null, null) ;
String proAndDevIds = conf.getSetAttrTxt(doc, "config.mqtt", "protocolAndDeviceIds", null, true, null) ;
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java
index ed9df08..40dbe26 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java
@@ -56,7 +56,9 @@
//Mqtt妯″潡鏄惁鍚姩
public static Boolean mqttUnitEnable = false ;
- public static Long mqttAcceptDataMinInterval = 60 * 60 * 1000L ;//榛樿60鍒嗛挓
+ public static Long acceptManureDataMinInterval = 60 * 60 * 1000L ;//榛樿60鍒嗛挓
+ public static Long acceptSoilDataMinInterval = 60 * 60 * 1000L ;//榛樿60鍒嗛挓
+ public static Long acceptWeatherDataMinInterval = 60 * 60 * 1000L ;//榛樿60鍒嗛挓
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkDealManureLastSdV1.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkDealManureLastSdV1.java
new file mode 100644
index 0000000..fd52a50
--- /dev/null
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkDealManureLastSdV1.java
@@ -0,0 +1,58 @@
+package com.dy.rtuMw.server.rtuData.pSdV1;
+
+import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
+import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.ManureVo;
+import com.dy.pipIrrGlobal.pojoPr.PrStManure;
+import com.dy.rtuMw.server.rtuData.dbSv.DbSv;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * @Author: liurunyu
+ * @Date: 2025/7/31 16:42
+ * @Description
+ */
+public class TkDealManureLastSdV1 extends TkDealManureSdV1 {
+
+ private static Logger log = LogManager.getLogger(TkDealManureLastSdV1.class.getName());
+
+ //绫籌D锛屼竴瀹氫笌Tree.xml閰嶇疆鏂囦欢涓厤缃竴鑷�
+ public static final String taskId = "TkDealManureLastSdV1";
+
+ /**
+ * 鎵ц鑺傜偣浠诲姟
+ *
+ * @param data 闇�瑕佸鐞嗙殑鏁版嵁
+ */
+ @Override
+ public void execute(Object data) {
+ //鍓嶉潰鐨勪换鍔″凡缁忓垽鏂簡data涓嶄负绌轰笖涓烘按鑲ユ暟鎹�
+ MqttSubMsg msg = (MqttSubMsg) data;
+ ManureVo stVo = (ManureVo) msg.vo4Up;
+ Object[] objs = this.getTaskResults(TkPreGenObjs4ManureSdV1.taskId);
+ DbSv sv = (DbSv) objs[0];
+ PrStManure stPo = (PrStManure) objs[1];
+ try{
+ this.doDeal(sv, stPo, msg, stVo);
+ }catch (Exception e){
+ log.error("淇濆瓨姘磋偉鏁版嵁鏃跺彂鐢熷紓甯�", e);
+ }
+ }
+
+ /**
+ * 澶勭悊涓婅娑堟伅鏁版嵁
+ * @param sv 鏈嶅姟
+ * @param stPo 瀹炰綋瀵硅薄
+ * @param msg 涓婅鐨勮闃呮秷鎭�
+ * @param stVo 涓婅鐨勮澶囨暟鎹�
+ */
+ private void doDeal(DbSv sv,
+ PrStManure stPo,
+ MqttSubMsg msg,
+ ManureVo stVo) throws Exception {
+ //2025-07-31 寮曞鏁版嵁浼氬緢棰戠箒锛屾墍浠ヤ笉鍦ㄩ拤閽夊彂娑堟伅锛屽啀璇撮拤閽変笉鍙绻佸彂娑堟伅
+ this.saveOrUpdateLast(sv, stPo, msg, stVo, null, false);
+ }
+
+
+}
\ No newline at end of file
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkDealManureSdV1.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkDealManureSdV1.java
index 2c23cd4..6e22db7 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkDealManureSdV1.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkDealManureSdV1.java
@@ -55,7 +55,7 @@
MqttSubMsg msg,
ManureVo stVo) throws Exception {
RmManureHistory hpo = this.saveHistory(sv, stPo, msg, stVo);
- this.saveOrUpdateLast(sv, stPo, msg, stVo, hpo);
+ this.saveOrUpdateLast(sv, stPo, msg, stVo, hpo, true);
}
/**
* 澶勭悊涓婅娑堟伅鏁版嵁
@@ -65,11 +65,12 @@
* @param stVo 涓婅鐨勮澶囨暟鎹�
* @param hpo 鍘嗗彶璁板綍鏈�鏂版暟鎹�
*/
- private void saveOrUpdateLast(DbSv sv,
+ protected void saveOrUpdateLast(DbSv sv,
PrStManure stPo,
MqttSubMsg msg,
ManureVo stVo,
- RmManureHistory hpo) throws Exception {
+ RmManureHistory hpo,
+ boolean sendDingMs) throws Exception {
RmManureLast po = sv.getRmManureLast(stPo.id) ;
if(po == null){
po = new RmManureLast();
@@ -77,7 +78,7 @@
po.manureId = stPo.id ;
po.lastHistoryId = hpo==null?null:hpo.id ;
sv.saveRmManureLast(po) ;
- if(stVo.alarm != null && stVo.alarm == 1){
+ if(stVo.alarm != null && stVo.alarm == 1 && sendDingMs){
this.sendMessage(stPo, msg, stVo);
}
}else{
@@ -85,7 +86,7 @@
po.manureId = stPo.id ;
po.lastHistoryId = hpo==null?null:hpo.id ;
sv.updateRmManureLast(po);
- if(stVo.alarm != null && stVo.alarm == 1){
+ if(stVo.alarm != null && stVo.alarm == 1 && sendDingMs){
this.sendMessage(stPo, msg, stVo);
}
}
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4ManureSdV1.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4ManureSdV1.java
index 28ddede..d7c097c 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4ManureSdV1.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4ManureSdV1.java
@@ -43,27 +43,34 @@
ManureVo lastVo = lastUpManureData.get(msg.deviceId + nowVo.no) ;
Long now = System.currentTimeMillis() ;
if(lastVo != null && nowVo.stateIsChanged(lastVo)){
- //鐘舵�佹湁鍙樺寲
- this.deal(data, msg, nowVo, now);
+ //鐘舵�佹湁鍙樺寲锛屾渶鏂版暟鎹強鍘嗗彶鏁版嵁閮借瀛樺偍
+ this.deal(data, msg, nowVo, now, false);
}else{
//鐘舵�佹棤鍙樺寲
Long lastAt = dealDataAtDateTime.get(msg.deviceId + nowVo.no);
- if(lastAt == null || ((now - lastAt) >= ServerProperties.mqttAcceptDataMinInterval)){
- //瓒呰繃鏈�灏忔椂闂撮棿闅�
- this.deal(data, msg, nowVo, now);
+ if(lastAt == null || ((now - lastAt) >= ServerProperties.acceptManureDataMinInterval)){
+ //瓒呰繃鏈�灏忔椂闂撮棿闅旓紝 鏈�鏂版暟鎹強鍘嗗彶鏁版嵁閮借瀛樺偍
+ this.deal(data, msg, nowVo, now, false);
+ }else{
+ //鏈秴杩囨渶灏忔椂闂撮棿闅旓紝 鏈�鏂版暟鎹瀛樺偍锛屼互澶囧鎴风鍙婃椂鏌ョ湅鐘舵��
+ this.deal(data, msg, nowVo, now, true);
}
}
lastUpManureData.put(msg.deviceId + nowVo.no, nowVo);
}
}
- private void deal(Object data, MqttSubMsg msg, ManureVo stVo, Long now){
+ private void deal(Object data, MqttSubMsg msg, ManureVo stVo, Long now, boolean dealOnlyLast){
dealDataAtDateTime.put(msg.deviceId + stVo.no, now);
DbSv sv = SpringContextUtil.getBean(DbSv.class) ;
if(sv != null){
PrStManure stPo = sv.getStManureByFBoxIdAndNo(msg.deviceId, stVo.no) ;
if(stPo != null){
this.taskResult = new Object[]{sv, stPo} ;
- this.toNextTasks(data);
+ if(dealOnlyLast){
+ this.toNextOneTask(data, TkDealManureLastSdV1.taskId);
+ }else{
+ this.toNextOneTask(data, TkDealManureSdV1.taskId);
+ }
}
}else{
log.error("涓ラ噸閿欒锛屾湭鑳藉緱鍒癉bSv瀵硅薄");
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4SoilSdV1.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4SoilSdV1.java
index b978c59..216eee3 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4SoilSdV1.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4SoilSdV1.java
@@ -41,7 +41,7 @@
SoilVo stVo = (SoilVo)msg.vo4Up ;
Long lastAt = dealDataAtDateTime.get(msg.deviceId + stVo.no);
Long now = System.currentTimeMillis() ;
- if(lastAt == null || ((now - lastAt) >= ServerProperties.mqttAcceptDataMinInterval)) {
+ if(lastAt == null || ((now - lastAt) >= ServerProperties.acceptSoilDataMinInterval)) {
dealDataAtDateTime.put(msg.deviceId + stVo.no, now);
DbSv sv = SpringContextUtil.getBean(DbSv.class);
if (sv != null) {
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4WeatherSdV1.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4WeatherSdV1.java
index e799ea1..94dd5d0 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4WeatherSdV1.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4WeatherSdV1.java
@@ -41,7 +41,7 @@
WeatherVo stVo = (WeatherVo)msg.vo4Up ;
Long lastAt = dealDataAtDateTime.get(msg.deviceId + stVo.no);
Long now = System.currentTimeMillis() ;
- if(lastAt == null || ((now - lastAt) >= ServerProperties.mqttAcceptDataMinInterval)) {
+ if(lastAt == null || ((now - lastAt) >= ServerProperties.acceptWeatherDataMinInterval)) {
dealDataAtDateTime.put(msg.deviceId + stVo.no, now);
DbSv sv = SpringContextUtil.getBean(DbSv.class);
if (sv != null) {
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml
index b9bd804..cb0891c 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml
@@ -96,7 +96,9 @@
<task id="TkMqttData" name="鎺ユ敹Mqtt娑堟伅" enable="true" class="com.dy.rtuMw.server.rtuData.TkMqttData">
<task id="TkFindPSdV1" name="璇嗗埆灞变笢V1鏁版嵁" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkFindPSdV1">
<task id="TkPreGenObjs4ManureSdV1" name="涓哄鐞嗘按鑲ユ満鏁版嵁棰勫厛鍑嗗鍚勫璞�" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkPreGenObjs4ManureSdV1">
- <task id="TkDealManureSdV1" name="澶勭悊姘磋偉鏁版嵁" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkDealManureSdV1">
+ <task id="TkDealManureLastSdV1" name="澶勭悊姘磋偉鏈�鏂版暟鎹�" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkDealManureLastSdV1">
+ </task>
+ <task id="TkDealManureSdV1" name="澶勭悊姘磋偉鏈�鏂板拰鍘嗗彶鏁版嵁" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkDealManureSdV1">
</task>
<!-- 鍙湁姘磋偉鏈烘湁杩滅▼鍛戒护 -->
<task id="TkFindComResponseSdV1" name="璇嗗埆鍝嶅簲鍛戒护鏁版嵁" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkFindComResponseSdV1">
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
index 74792d9..b5a7f13 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
@@ -174,7 +174,9 @@
sendInterval 鍛戒护鍙戦�侀棿闅旓紙鍗曚綅绉掞級锛屽彇鍊艰寖鍥存槸1~3600
reSendTimesByNoResult 鏈敹鍒板懡浠ょ粨鏋滄椂锛屾渶澶ч噸澶嶅彂閫佹鏁帮紝鍙栧�艰寖鍥存槸0~100, 0鏃惰〃绀轰笉閲嶅鍙戦��
comCacheTimeout: 鍛戒护鏈�澶х紦瀛樻椂闀�(绉�)锛屽彇鍊艰寖鍥存槸1~3600
- acceptDataMinInterval:鍥犱负涓婅鏁版嵁瀵嗗害杈冨ぇ锛堝10绉掍笂琛屼竴鏉℃暟鎹級锛屾暟鎹鐞嗕笌瀛樺偍鍘嬪姏澶э紝鎵�浠ユ渶灏忛棿闅斾竴瀹氭椂闂达紙鍒嗛挓锛夌湡姝f帴鏀跺鐞嗕竴鏉′笂琛屾暟鎹�
+ acceptManureDataMinInterval:鍥犱负涓婅姘磋偉鏈烘暟鎹瘑搴﹁緝澶э紙濡�10绉掍笂琛屼竴鏉℃暟鎹級锛屾暟鎹鐞嗕笌瀛樺偍鍘嬪姏澶э紝鎵�浠ユ渶灏忛棿闅斾竴瀹氭椂闂达紙鍒嗛挓锛夌湡姝f帴鏀跺鐞嗕竴鏉′笂琛屾暟鎹�
+ acceptSoilDataMinInterval:鍥犱负涓婅澧掓儏鏁版嵁瀵嗗害杈冨ぇ锛堝10绉掍笂琛屼竴鏉℃暟鎹級锛屾暟鎹鐞嗕笌瀛樺偍鍘嬪姏澶э紝鎵�浠ユ渶灏忛棿闅斾竴瀹氭椂闂达紙鍒嗛挓锛夌湡姝f帴鏀跺鐞嗕竴鏉′笂琛屾暟鎹�
+ acceptWeatherDataMinInterval:鍥犱负涓婅姘旇薄鏁版嵁瀵嗗害杈冨ぇ锛堝10绉掍笂琛屼竴鏉℃暟鎹級锛屾暟鎹鐞嗕笌瀛樺偍鍘嬪姏澶э紝鎵�浠ユ渶灏忛棿闅斾竴瀹氭椂闂达紙鍒嗛挓锛夌湡姝f帴鏀跺鐞嗕竴鏉′笂琛屾暟鎹�
useMemoryPersistence 浣跨敤鍐呭瓨鎸佷箙鍖栬�岄潪榛樿鐨勬枃浠舵寔涔呭寲(true鏄� false鍚�)
protocolAndDeviceIds 鍦ㄥ瓙绯荤粺锛坥rgTag锛変腑鎺ュ叆鐨勮澶�(FBox)鎵�鐢ㄥ崗璁強璁惧id闆嗗悎,澶氫釜鐢ㄩ�楀彿闅斿紑锛屽崗璁笌ID鐢ㄦ鏂滄潬闅斿紑锛屼緥濡傦細sd1/338220031439,sd1/338220031440
subTopicAndQos: 璁㈤槄涓婚涓嶲os锛屼富棰樺悕涓庡叾Qos鐢ㄩ�楀彿闅斿紑锛屽涓富棰樺強Qos鐢ㄥ垎鍙烽殧寮�锛屼緥濡傦細ym/topic1,1;ym/topic2,1;ym/topic3,1锛屽鏋滄湁澶氫釜OrgTag锛屼富棰樺墠缂�鐢ㄥ叾OrgTag
@@ -193,7 +195,9 @@
sendInterval="60"
reSendTimesByNoResult="0"
comCacheTimeout="30"
- acceptDataMinInterval="60"
+ acceptManureDataMinInterval="30"
+ acceptSoilDataMinInterval="60"
+ acceptWeatherDataMinInterval="60"
useMemoryPersistence="true"
protocolAndDeviceIds="${mqtt.protocolAndDeviceIds}"
subTopicAndQos="${mqtt.subTopicAndQos}"
--
Gitblit v1.8.0