中间件,对于水肥机数据,每条上报数据都进行最新上报数据处理,以备前端界面及时显示水肥机状态。
8个文件已修改
1个文件已添加
114 ■■■■ 已修改文件
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkDealManureLastSdV1.java 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkDealManureSdV1.java 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4ManureSdV1.java 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4SoilSdV1.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4WeatherSdV1.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
@@ -478,7 +478,9 @@
                mqVo.comCacheTimeout = conf.getSetAttrPlusInt(doc, "config.mqtt", "comCacheTimeout", null, 1, 3600, null) * 1000L ;
                ServerProperties.mqttAcceptDataMinInterval = conf.getSetAttrPlusInt(doc, "config.mqtt", "acceptDataMinInterval", null, 1, 720, null) * 60 * 1000L ;
                ServerProperties.acceptManureDataMinInterval = conf.getSetAttrPlusInt(doc, "config.mqtt", "acceptManureDataMinInterval", null, 1, 720, null) * 60 * 1000L ;
                ServerProperties.acceptSoilDataMinInterval = conf.getSetAttrPlusInt(doc, "config.mqtt", "acceptSoilDataMinInterval", null, 1, 720, null) * 60 * 1000L ;
                ServerProperties.acceptWeatherDataMinInterval = conf.getSetAttrPlusInt(doc, "config.mqtt", "acceptWeatherDataMinInterval", null, 1, 720, null) * 60 * 1000L ;
                mqVo.useMemoryPersistence = conf.getSetAttrBoolean(doc, "config.mqtt", "useMemoryPersistence", null, null) ;
                String proAndDevIds = conf.getSetAttrTxt(doc, "config.mqtt", "protocolAndDeviceIds", null, true, null) ;
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java
@@ -56,7 +56,9 @@
    //Mqtt模块是否启动
    public static Boolean mqttUnitEnable = false ;
    public static Long mqttAcceptDataMinInterval = 60 * 60 * 1000L ;//默认60分钟
    public static Long acceptManureDataMinInterval = 60 * 60 * 1000L ;//默认60分钟
    public static Long acceptSoilDataMinInterval = 60 * 60 * 1000L ;//默认60分钟
    public static Long acceptWeatherDataMinInterval = 60 * 60 * 1000L ;//默认60分钟
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkDealManureLastSdV1.java
New file
@@ -0,0 +1,58 @@
package com.dy.rtuMw.server.rtuData.pSdV1;
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.ManureVo;
import com.dy.pipIrrGlobal.pojoPr.PrStManure;
import com.dy.rtuMw.server.rtuData.dbSv.DbSv;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
 * @Author: liurunyu
 * @Date: 2025/7/31 16:42
 * @Description
 */
public class TkDealManureLastSdV1 extends TkDealManureSdV1 {
    private static Logger log = LogManager.getLogger(TkDealManureLastSdV1.class.getName());
    //类ID,一定与Tree.xml配置文件中配置一致
    public static final String taskId = "TkDealManureLastSdV1";
    /**
     * 执行节点任务
     *
     * @param data 需要处理的数据
     */
    @Override
    public void execute(Object data) {
        //前面的任务已经判断了data不为空且为水肥数据
        MqttSubMsg msg = (MqttSubMsg) data;
        ManureVo stVo = (ManureVo) msg.vo4Up;
        Object[] objs = this.getTaskResults(TkPreGenObjs4ManureSdV1.taskId);
        DbSv sv = (DbSv) objs[0];
        PrStManure stPo = (PrStManure) objs[1];
        try{
            this.doDeal(sv, stPo, msg, stVo);
        }catch (Exception e){
            log.error("保存水肥数据时发生异常", e);
        }
    }
    /**
     * 处理上行消息数据
     * @param sv 服务
     * @param stPo 实体对象
     * @param msg 上行的订阅消息
     * @param stVo 上行的设备数据
     */
    private void doDeal(DbSv sv,
                        PrStManure stPo,
                        MqttSubMsg msg,
                        ManureVo stVo) throws Exception {
        //2025-07-31 引处数据会很频繁,所以不在钉钉发消息,再说钉钉不叫频繁发消息
        this.saveOrUpdateLast(sv, stPo, msg, stVo, null, false);
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkDealManureSdV1.java
@@ -55,7 +55,7 @@
                        MqttSubMsg msg,
                        ManureVo stVo) throws Exception {
        RmManureHistory hpo = this.saveHistory(sv, stPo, msg, stVo);
        this.saveOrUpdateLast(sv, stPo, msg, stVo, hpo);
        this.saveOrUpdateLast(sv, stPo, msg, stVo, hpo, true);
    }
    /**
     * 处理上行消息数据
@@ -65,11 +65,12 @@
     * @param stVo 上行的设备数据
     * @param hpo 历史记录最新数据
     */
    private void saveOrUpdateLast(DbSv sv,
    protected void saveOrUpdateLast(DbSv sv,
                                  PrStManure stPo,
                                  MqttSubMsg msg,
                                  ManureVo stVo,
                                  RmManureHistory hpo) throws Exception {
                                  RmManureHistory hpo,
                                  boolean sendDingMs) throws Exception {
        RmManureLast po = sv.getRmManureLast(stPo.id) ;
        if(po == null){
            po = new RmManureLast();
@@ -77,7 +78,7 @@
            po.manureId = stPo.id ;
            po.lastHistoryId = hpo==null?null:hpo.id ;
            sv.saveRmManureLast(po) ;
            if(stVo.alarm != null && stVo.alarm == 1){
            if(stVo.alarm != null && stVo.alarm == 1 && sendDingMs){
                this.sendMessage(stPo, msg, stVo);
            }
        }else{
@@ -85,7 +86,7 @@
            po.manureId = stPo.id ;
            po.lastHistoryId = hpo==null?null:hpo.id ;
            sv.updateRmManureLast(po);
            if(stVo.alarm != null && stVo.alarm == 1){
            if(stVo.alarm != null && stVo.alarm == 1 && sendDingMs){
                this.sendMessage(stPo, msg, stVo);
            }
        }
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4ManureSdV1.java
@@ -43,27 +43,34 @@
            ManureVo lastVo = lastUpManureData.get(msg.deviceId + nowVo.no) ;
            Long now = System.currentTimeMillis() ;
            if(lastVo != null && nowVo.stateIsChanged(lastVo)){
                //状态有变化
                this.deal(data, msg, nowVo, now);
                //状态有变化,最新数据及历史数据都要存储
                this.deal(data, msg, nowVo, now, false);
            }else{
                //状态无变化
                Long lastAt = dealDataAtDateTime.get(msg.deviceId + nowVo.no);
                if(lastAt == null || ((now - lastAt) >= ServerProperties.mqttAcceptDataMinInterval)){
                    //超过最小时间间隔
                    this.deal(data, msg, nowVo, now);
                if(lastAt == null || ((now - lastAt) >= ServerProperties.acceptManureDataMinInterval)){
                    //超过最小时间间隔, 最新数据及历史数据都要存储
                    this.deal(data, msg, nowVo, now, false);
                }else{
                    //未超过最小时间间隔, 最新数据要存储,以备客户端及时查看状态
                    this.deal(data, msg, nowVo, now, true);
                }
            }
            lastUpManureData.put(msg.deviceId + nowVo.no, nowVo);
        }
    }
    private void deal(Object data, MqttSubMsg msg, ManureVo stVo, Long now){
    private void deal(Object data, MqttSubMsg msg, ManureVo stVo, Long now, boolean dealOnlyLast){
        dealDataAtDateTime.put(msg.deviceId + stVo.no, now);
        DbSv sv = SpringContextUtil.getBean(DbSv.class) ;
        if(sv != null){
            PrStManure stPo = sv.getStManureByFBoxIdAndNo(msg.deviceId, stVo.no) ;
            if(stPo != null){
                this.taskResult = new Object[]{sv, stPo} ;
                this.toNextTasks(data);
                if(dealOnlyLast){
                    this.toNextOneTask(data, TkDealManureLastSdV1.taskId);
                }else{
                    this.toNextOneTask(data, TkDealManureSdV1.taskId);
                }
            }
        }else{
            log.error("严重错误,未能得到DbSv对象");
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4SoilSdV1.java
@@ -41,7 +41,7 @@
            SoilVo stVo = (SoilVo)msg.vo4Up ;
            Long lastAt = dealDataAtDateTime.get(msg.deviceId + stVo.no);
            Long now = System.currentTimeMillis() ;
            if(lastAt == null || ((now - lastAt) >= ServerProperties.mqttAcceptDataMinInterval)) {
            if(lastAt == null || ((now - lastAt) >= ServerProperties.acceptSoilDataMinInterval)) {
                dealDataAtDateTime.put(msg.deviceId + stVo.no, now);
                DbSv sv = SpringContextUtil.getBean(DbSv.class);
                if (sv != null) {
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkPreGenObjs4WeatherSdV1.java
@@ -41,7 +41,7 @@
            WeatherVo stVo = (WeatherVo)msg.vo4Up ;
            Long lastAt = dealDataAtDateTime.get(msg.deviceId + stVo.no);
            Long now = System.currentTimeMillis() ;
            if(lastAt == null || ((now - lastAt) >= ServerProperties.mqttAcceptDataMinInterval)) {
            if(lastAt == null || ((now - lastAt) >= ServerProperties.acceptWeatherDataMinInterval)) {
                dealDataAtDateTime.put(msg.deviceId + stVo.no, now);
                DbSv sv = SpringContextUtil.getBean(DbSv.class);
                if (sv != null) {
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml
@@ -96,7 +96,9 @@
        <task id="TkMqttData" name="接收Mqtt消息" enable="true" class="com.dy.rtuMw.server.rtuData.TkMqttData">
            <task id="TkFindPSdV1" name="识别山东V1数据" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkFindPSdV1">
                <task id="TkPreGenObjs4ManureSdV1" name="为处理水肥机数据预先准备各对象" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkPreGenObjs4ManureSdV1">
                    <task id="TkDealManureSdV1" name="处理水肥数据" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkDealManureSdV1">
                    <task id="TkDealManureLastSdV1" name="处理水肥最新数据" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkDealManureLastSdV1">
                    </task>
                    <task id="TkDealManureSdV1" name="处理水肥最新和历史数据" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkDealManureSdV1">
                    </task>
                    <!-- 只有水肥机有远程命令 -->
                    <task id="TkFindComResponseSdV1" name="识别响应命令数据" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkFindComResponseSdV1">
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
@@ -174,7 +174,9 @@
    sendInterval 命令发送间隔(单位秒),取值范围是1~3600
    reSendTimesByNoResult 未收到命令结果时,最大重复发送次数,取值范围是0~100, 0时表示不重复发送
    comCacheTimeout: 命令最大缓存时长(秒),取值范围是1~3600
    acceptDataMinInterval:因为上行数据密度较大(如10秒上行一条数据),数据处理与存储压力大,所以最小间隔一定时间(分钟)真正接收处理一条上行数据
    acceptManureDataMinInterval:因为上行水肥机数据密度较大(如10秒上行一条数据),数据处理与存储压力大,所以最小间隔一定时间(分钟)真正接收处理一条上行数据
    acceptSoilDataMinInterval:因为上行墒情数据密度较大(如10秒上行一条数据),数据处理与存储压力大,所以最小间隔一定时间(分钟)真正接收处理一条上行数据
    acceptWeatherDataMinInterval:因为上行气象数据密度较大(如10秒上行一条数据),数据处理与存储压力大,所以最小间隔一定时间(分钟)真正接收处理一条上行数据
    useMemoryPersistence 使用内存持久化而非默认的文件持久化(true是 false否)
    protocolAndDeviceIds 在子系统(orgTag)中接入的设备(FBox)所用协议及设备id集合,多个用逗号隔开,协议与ID用正斜杠隔开,例如:sd1/338220031439,sd1/338220031440
    subTopicAndQos: 订阅主题与Qos,主题名与其Qos用逗号隔开,多个主题及Qos用分号隔开,例如:ym/topic1,1;ym/topic2,1;ym/topic3,1,如果有多个OrgTag,主题前缀用其OrgTag
@@ -193,7 +195,9 @@
          sendInterval="60"
          reSendTimesByNoResult="0"
          comCacheTimeout="30"
          acceptDataMinInterval="60"
          acceptManureDataMinInterval="30"
          acceptSoilDataMinInterval="60"
          acceptWeatherDataMinInterval="60"
          useMemoryPersistence="true"
          protocolAndDeviceIds="${mqtt.protocolAndDeviceIds}"
          subTopicAndQos="${mqtt.subTopicAndQos}"