中间件,对于水肥机数据,每条上报数据都进行最新上报数据处理,以备前端界面及时显示水肥机状态。
| | |
| | | |
| | | mqVo.comCacheTimeout = conf.getSetAttrPlusInt(doc, "config.mqtt", "comCacheTimeout", null, 1, 3600, null) * 1000L ; |
| | | |
| | | ServerProperties.mqttAcceptDataMinInterval = conf.getSetAttrPlusInt(doc, "config.mqtt", "acceptDataMinInterval", null, 1, 720, null) * 60 * 1000L ; |
| | | ServerProperties.acceptManureDataMinInterval = conf.getSetAttrPlusInt(doc, "config.mqtt", "acceptManureDataMinInterval", null, 1, 720, null) * 60 * 1000L ; |
| | | ServerProperties.acceptSoilDataMinInterval = conf.getSetAttrPlusInt(doc, "config.mqtt", "acceptSoilDataMinInterval", null, 1, 720, null) * 60 * 1000L ; |
| | | ServerProperties.acceptWeatherDataMinInterval = conf.getSetAttrPlusInt(doc, "config.mqtt", "acceptWeatherDataMinInterval", null, 1, 720, null) * 60 * 1000L ; |
| | | |
| | | mqVo.useMemoryPersistence = conf.getSetAttrBoolean(doc, "config.mqtt", "useMemoryPersistence", null, null) ; |
| | | String proAndDevIds = conf.getSetAttrTxt(doc, "config.mqtt", "protocolAndDeviceIds", null, true, null) ; |
| | |
| | | |
| | | //Mqtt模块是否启动 |
| | | public static Boolean mqttUnitEnable = false ; |
| | | public static Long mqttAcceptDataMinInterval = 60 * 60 * 1000L ;//默认60分钟 |
| | | public static Long acceptManureDataMinInterval = 60 * 60 * 1000L ;//默认60分钟 |
| | | public static Long acceptSoilDataMinInterval = 60 * 60 * 1000L ;//默认60分钟 |
| | | public static Long acceptWeatherDataMinInterval = 60 * 60 * 1000L ;//默认60分钟 |
| | | |
| | | |
| | | |
New file |
| | |
| | | package com.dy.rtuMw.server.rtuData.pSdV1; |
| | | |
| | | import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; |
| | | import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.ManureVo; |
| | | import com.dy.pipIrrGlobal.pojoPr.PrStManure; |
| | | import com.dy.rtuMw.server.rtuData.dbSv.DbSv; |
| | | import org.apache.logging.log4j.LogManager; |
| | | import org.apache.logging.log4j.Logger; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | | * @Date: 2025/7/31 16:42 |
| | | * @Description |
| | | */ |
| | | public class TkDealManureLastSdV1 extends TkDealManureSdV1 { |
| | | |
| | | private static Logger log = LogManager.getLogger(TkDealManureLastSdV1.class.getName()); |
| | | |
| | | //类ID,一定与Tree.xml配置文件中配置一致 |
| | | public static final String taskId = "TkDealManureLastSdV1"; |
| | | |
| | | /** |
| | | * 执行节点任务 |
| | | * |
| | | * @param data 需要处理的数据 |
| | | */ |
| | | @Override |
| | | public void execute(Object data) { |
| | | //前面的任务已经判断了data不为空且为水肥数据 |
| | | MqttSubMsg msg = (MqttSubMsg) data; |
| | | ManureVo stVo = (ManureVo) msg.vo4Up; |
| | | Object[] objs = this.getTaskResults(TkPreGenObjs4ManureSdV1.taskId); |
| | | DbSv sv = (DbSv) objs[0]; |
| | | PrStManure stPo = (PrStManure) objs[1]; |
| | | try{ |
| | | this.doDeal(sv, stPo, msg, stVo); |
| | | }catch (Exception e){ |
| | | log.error("保存水肥数据时发生异常", e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 处理上行消息数据 |
| | | * @param sv 服务 |
| | | * @param stPo 实体对象 |
| | | * @param msg 上行的订阅消息 |
| | | * @param stVo 上行的设备数据 |
| | | */ |
| | | private void doDeal(DbSv sv, |
| | | PrStManure stPo, |
| | | MqttSubMsg msg, |
| | | ManureVo stVo) throws Exception { |
| | | //2025-07-31 引处数据会很频繁,所以不在钉钉发消息,再说钉钉不叫频繁发消息 |
| | | this.saveOrUpdateLast(sv, stPo, msg, stVo, null, false); |
| | | } |
| | | |
| | | |
| | | } |
| | |
| | | MqttSubMsg msg, |
| | | ManureVo stVo) throws Exception { |
| | | RmManureHistory hpo = this.saveHistory(sv, stPo, msg, stVo); |
| | | this.saveOrUpdateLast(sv, stPo, msg, stVo, hpo); |
| | | this.saveOrUpdateLast(sv, stPo, msg, stVo, hpo, true); |
| | | } |
| | | /** |
| | | * 处理上行消息数据 |
| | |
| | | * @param stVo 上行的设备数据 |
| | | * @param hpo 历史记录最新数据 |
| | | */ |
| | | private void saveOrUpdateLast(DbSv sv, |
| | | protected void saveOrUpdateLast(DbSv sv, |
| | | PrStManure stPo, |
| | | MqttSubMsg msg, |
| | | ManureVo stVo, |
| | | RmManureHistory hpo) throws Exception { |
| | | RmManureHistory hpo, |
| | | boolean sendDingMs) throws Exception { |
| | | RmManureLast po = sv.getRmManureLast(stPo.id) ; |
| | | if(po == null){ |
| | | po = new RmManureLast(); |
| | |
| | | po.manureId = stPo.id ; |
| | | po.lastHistoryId = hpo==null?null:hpo.id ; |
| | | sv.saveRmManureLast(po) ; |
| | | if(stVo.alarm != null && stVo.alarm == 1){ |
| | | if(stVo.alarm != null && stVo.alarm == 1 && sendDingMs){ |
| | | this.sendMessage(stPo, msg, stVo); |
| | | } |
| | | }else{ |
| | |
| | | po.manureId = stPo.id ; |
| | | po.lastHistoryId = hpo==null?null:hpo.id ; |
| | | sv.updateRmManureLast(po); |
| | | if(stVo.alarm != null && stVo.alarm == 1){ |
| | | if(stVo.alarm != null && stVo.alarm == 1 && sendDingMs){ |
| | | this.sendMessage(stPo, msg, stVo); |
| | | } |
| | | } |
| | |
| | | ManureVo lastVo = lastUpManureData.get(msg.deviceId + nowVo.no) ; |
| | | Long now = System.currentTimeMillis() ; |
| | | if(lastVo != null && nowVo.stateIsChanged(lastVo)){ |
| | | //状态有变化 |
| | | this.deal(data, msg, nowVo, now); |
| | | //状态有变化,最新数据及历史数据都要存储 |
| | | this.deal(data, msg, nowVo, now, false); |
| | | }else{ |
| | | //状态无变化 |
| | | Long lastAt = dealDataAtDateTime.get(msg.deviceId + nowVo.no); |
| | | if(lastAt == null || ((now - lastAt) >= ServerProperties.mqttAcceptDataMinInterval)){ |
| | | //超过最小时间间隔 |
| | | this.deal(data, msg, nowVo, now); |
| | | if(lastAt == null || ((now - lastAt) >= ServerProperties.acceptManureDataMinInterval)){ |
| | | //超过最小时间间隔, 最新数据及历史数据都要存储 |
| | | this.deal(data, msg, nowVo, now, false); |
| | | }else{ |
| | | //未超过最小时间间隔, 最新数据要存储,以备客户端及时查看状态 |
| | | this.deal(data, msg, nowVo, now, true); |
| | | } |
| | | } |
| | | lastUpManureData.put(msg.deviceId + nowVo.no, nowVo); |
| | | } |
| | | } |
| | | private void deal(Object data, MqttSubMsg msg, ManureVo stVo, Long now){ |
| | | private void deal(Object data, MqttSubMsg msg, ManureVo stVo, Long now, boolean dealOnlyLast){ |
| | | dealDataAtDateTime.put(msg.deviceId + stVo.no, now); |
| | | DbSv sv = SpringContextUtil.getBean(DbSv.class) ; |
| | | if(sv != null){ |
| | | PrStManure stPo = sv.getStManureByFBoxIdAndNo(msg.deviceId, stVo.no) ; |
| | | if(stPo != null){ |
| | | this.taskResult = new Object[]{sv, stPo} ; |
| | | this.toNextTasks(data); |
| | | if(dealOnlyLast){ |
| | | this.toNextOneTask(data, TkDealManureLastSdV1.taskId); |
| | | }else{ |
| | | this.toNextOneTask(data, TkDealManureSdV1.taskId); |
| | | } |
| | | } |
| | | }else{ |
| | | log.error("严重错误,未能得到DbSv对象"); |
| | |
| | | SoilVo stVo = (SoilVo)msg.vo4Up ; |
| | | Long lastAt = dealDataAtDateTime.get(msg.deviceId + stVo.no); |
| | | Long now = System.currentTimeMillis() ; |
| | | if(lastAt == null || ((now - lastAt) >= ServerProperties.mqttAcceptDataMinInterval)) { |
| | | if(lastAt == null || ((now - lastAt) >= ServerProperties.acceptSoilDataMinInterval)) { |
| | | dealDataAtDateTime.put(msg.deviceId + stVo.no, now); |
| | | DbSv sv = SpringContextUtil.getBean(DbSv.class); |
| | | if (sv != null) { |
| | |
| | | WeatherVo stVo = (WeatherVo)msg.vo4Up ; |
| | | Long lastAt = dealDataAtDateTime.get(msg.deviceId + stVo.no); |
| | | Long now = System.currentTimeMillis() ; |
| | | if(lastAt == null || ((now - lastAt) >= ServerProperties.mqttAcceptDataMinInterval)) { |
| | | if(lastAt == null || ((now - lastAt) >= ServerProperties.acceptWeatherDataMinInterval)) { |
| | | dealDataAtDateTime.put(msg.deviceId + stVo.no, now); |
| | | DbSv sv = SpringContextUtil.getBean(DbSv.class); |
| | | if (sv != null) { |
| | |
| | | <task id="TkMqttData" name="接收Mqtt消息" enable="true" class="com.dy.rtuMw.server.rtuData.TkMqttData"> |
| | | <task id="TkFindPSdV1" name="识别山东V1数据" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkFindPSdV1"> |
| | | <task id="TkPreGenObjs4ManureSdV1" name="为处理水肥机数据预先准备各对象" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkPreGenObjs4ManureSdV1"> |
| | | <task id="TkDealManureSdV1" name="处理水肥数据" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkDealManureSdV1"> |
| | | <task id="TkDealManureLastSdV1" name="处理水肥最新数据" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkDealManureLastSdV1"> |
| | | </task> |
| | | <task id="TkDealManureSdV1" name="处理水肥最新和历史数据" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkDealManureSdV1"> |
| | | </task> |
| | | <!-- 只有水肥机有远程命令 --> |
| | | <task id="TkFindComResponseSdV1" name="识别响应命令数据" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkFindComResponseSdV1"> |
| | |
| | | sendInterval 命令发送间隔(单位秒),取值范围是1~3600 |
| | | reSendTimesByNoResult 未收到命令结果时,最大重复发送次数,取值范围是0~100, 0时表示不重复发送 |
| | | comCacheTimeout: 命令最大缓存时长(秒),取值范围是1~3600 |
| | | acceptDataMinInterval:因为上行数据密度较大(如10秒上行一条数据),数据处理与存储压力大,所以最小间隔一定时间(分钟)真正接收处理一条上行数据 |
| | | acceptManureDataMinInterval:因为上行水肥机数据密度较大(如10秒上行一条数据),数据处理与存储压力大,所以最小间隔一定时间(分钟)真正接收处理一条上行数据 |
| | | acceptSoilDataMinInterval:因为上行墒情数据密度较大(如10秒上行一条数据),数据处理与存储压力大,所以最小间隔一定时间(分钟)真正接收处理一条上行数据 |
| | | acceptWeatherDataMinInterval:因为上行气象数据密度较大(如10秒上行一条数据),数据处理与存储压力大,所以最小间隔一定时间(分钟)真正接收处理一条上行数据 |
| | | useMemoryPersistence 使用内存持久化而非默认的文件持久化(true是 false否) |
| | | protocolAndDeviceIds 在子系统(orgTag)中接入的设备(FBox)所用协议及设备id集合,多个用逗号隔开,协议与ID用正斜杠隔开,例如:sd1/338220031439,sd1/338220031440 |
| | | subTopicAndQos: 订阅主题与Qos,主题名与其Qos用逗号隔开,多个主题及Qos用分号隔开,例如:ym/topic1,1;ym/topic2,1;ym/topic3,1,如果有多个OrgTag,主题前缀用其OrgTag |
| | |
| | | sendInterval="60" |
| | | reSendTimesByNoResult="0" |
| | | comCacheTimeout="30" |
| | | acceptDataMinInterval="60" |
| | | acceptManureDataMinInterval="30" |
| | | acceptSoilDataMinInterval="60" |
| | | acceptWeatherDataMinInterval="60" |
| | | useMemoryPersistence="true" |
| | | protocolAndDeviceIds="${mqtt.protocolAndDeviceIds}" |
| | | subTopicAndQos="${mqtt.subTopicAndQos}" |