1、MQTT协议,增加设备参数命令及相关数据;
2、完善上行数据值对象;
3、完善其他代码。
2 文件已重命名
21个文件已修改
3个文件已添加
380 ■■■■ 已修改文件
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java 66 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComSetParamVo.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ParamSetVo.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineInfo.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunInfo.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgDealer.java 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java
@@ -21,7 +21,7 @@
                vo.orgTag = topicGrp[0] ;
                vo.protocol = topicGrp[1] ;
                vo.devId = topicGrp[2] ;
                vo.topic = topicGrp[3] ;
                vo.name = topicGrp[3] ;
                return vo ;
            }
        }else{
@@ -30,7 +30,7 @@
    }
    public static String createPubTopic(MqttTopic tp) throws Exception {
        return tp.orgTag + "/" + tp.protocol + "/" + tp.devId + "/" + tp.topic ;
        return tp.orgTag + "/" + tp.protocol + "/" + tp.devId + "/" + tp.name;
    }
    public static MqttSubMsg parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception {
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java
@@ -12,7 +12,7 @@
    public String mqttResultSendWebUrl ;//Mqtt返回命令结果 发向目的地web URL
    public String topic ;//消息主题
    public MqttTopic topic ;//消息主题
    public String msg ;//消息
    public boolean isCacheForOffLine ;//下行命令控制,消息中间件不在线是否缓存命令
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
@@ -16,18 +16,18 @@
    public String orgTag ;//组织标识
    public String protocol ;//协议名称
    public String devId ;//设备(FBox)ID
    public String topic ;//消息主题
    public String name;//消息主题末端名称
    public boolean isEmpty(){
        return orgTag == null || protocol == null || devId == null || topic == null
                || orgTag.trim().length() == 0 || protocol.trim().length() == 0 || devId.trim().length() == 0 || topic.trim().length() == 0 ;
        return orgTag == null || protocol == null || devId == null || name == null
                || orgTag.trim().length() == 0 || protocol.trim().length() == 0 || devId.trim().length() == 0 || name.trim().length() == 0 ;
    }
    public String shortName(){
        return topic ;
        return name;
    }
    public String longName(){
        return orgTag + "/" + protocol + "/" + devId + "/" + topic ;
        return orgTag + "/" + protocol + "/" + devId + "/" + name;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java
@@ -10,4 +10,5 @@
    public static final String cd_Stir = "01" ;//搅拌启停命令
    public static final String cd_Inject = "02" ;//注肥启停命令
    public static final String cd_Irr = "03" ;//灌溉启停命令
    public static final String cd_Param = "10" ;//设定参数
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
@@ -51,8 +51,9 @@
    public boolean subMsgMatchPubMsg(MqttPubMsg pubMsg){
        if (pubMsg instanceof MqttPubMsgSdV1) {
            MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg;
            //MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg;
            if(this.vo4Up != null && this.vo4Up instanceof StateVo){
                //只要上报的是状态数据,说明设备响应了命令
                return true ;
            }
        }
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java
@@ -20,5 +20,6 @@
    public static final String PubTopicStir = "ctrlStir" ;//搅拌启停
    public static final String PubTopicInject = "ctrlInject" ;//注肥启停
    public static final String PubTopicIrr = "ctrlIrr" ;//灌溉启停
    public static final String PubTopicParam = "setParam" ;//设置参数
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
@@ -4,15 +4,16 @@
import com.alibaba.fastjson2.JSONObject;
import com.dy.common.mw.protocol.Command;
import com.dy.common.mw.protocol4Mqtt.MqttCallback;
import com.dy.common.mw.protocol4Mqtt.MqttMsgParser;
import com.dy.common.mw.protocol4Mqtt.MqttTopic;
import com.dy.common.mw.protocol4Mqtt.Vo4Up;
import com.dy.common.mw.protocol4Mqtt.pSdV1.comParam.ComCtrlVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.comParam.ComSetParamVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.FaultClearVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.InjectStartVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.ParamSetVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.StirStartVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.*;
import com.dy.common.mw.protocol4Mqtt.status.DevRunSt;
import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
@@ -28,8 +29,8 @@
        }
        MqttSubMsgSdV1 msg = new MqttSubMsgSdV1(subTopic, strTxt);
        Vo4Up vo ;
        DevRunSt stVo ;
        switch (subTopic.topic) {
        DevRunInfo stInfo = null ;
        switch (subTopic.name) {
            case ProtocolConstantSdV1.SubTopicWeather -> {
                vo = JSON.parseObject(strTxt, WeatherVo.class);
                break;
@@ -45,21 +46,22 @@
            case ProtocolConstantSdV1.SubTopicState -> {
                //此处未完成,应该产生一些通信的info,供下面callback.notify(objs)通知出去
                vo = JSON.parseObject(strTxt, StateVo.class);
                stVo = new DevRunSt() ;
                stVo.id = msg.deviceId ;
                //stVo.stirRunning = true ; //搅拌运行 true是 false否
                //stVo.injectRunning = true ; //注肥运行 true是 false否
                //stVo.irrRunning = true ; //灌溉运行 true是 false否
                //stVo.alarm = true ; //报警 true是 false否
                StateVo stVo = (StateVo)vo ;
                stInfo = new DevRunInfo() ;
                stInfo.devId = msg.deviceId ;
                stInfo.stirRunning = (stVo.stirRunning==null?false:(stVo.stirRunning.byteValue()==1?true:false)) ; //搅拌运行 true是 false否
                stInfo.injectRunning = (stVo.injectRunning==null?false:(stVo.injectRunning.byteValue()==1?true:false)) ; //注肥运行 true是 false否
                stInfo.irrRunning = (stVo.irrRunning==null?false:(stVo.irrRunning.byteValue()==1?true:false)) ; //灌溉运行 true是 false否
                stInfo.alarm = (stVo.alarm==null?false:(stVo.alarm.byteValue()==1?true:false)) ; //报警 true是 false否
                break;
            }
            default -> {
                throw new Exception("接收到MQTT消息,协议" + subTopic.protocol + ",设备ID" + subTopic.devId + ",主题" + subTopic.topic + "消息解析逻辑未实现");
                throw new Exception("接收到MQTT消息,协议" + subTopic.protocol + ",设备ID" + subTopic.devId + ",主题" + subTopic.name + "消息解析逻辑未实现");
            }
        }
        msg.vo4Up = vo ;
        callback.callback(msg);
        callback.notify(null);//此处未完成
        callback.notify(msg.deviceId, stInfo);
        return msg;
    }
@@ -94,6 +96,13 @@
                msg = this.createPubMsgOfIrr(orgTag, com);
                break;
            }
            case CodeSdV1.cd_Param -> {
                //设置参数
                this.checkParam(com);
                this.checkRtnWebUrl(com);
                msg = this.createPubMsgOfParam(orgTag, com);
                break;
            }
            default -> {
                throw new Exception("接收到MQTT命令,协议" + com.protocol + "版本" + com.protocolVersion + "功能码" + com.code + "构造器未实现");
            }
@@ -122,8 +131,8 @@
        msg.isCacheForOffLine = false ;
        msg.hasResponse = true ;
        msg.cd = CodeSdV1.cd_Fault ;
        msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
        msg.msg = JSON.toJSONString(new FaultClearVo(cvo.isDo)) ;
        msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault) ;
        msg.msg = JSON.toJSONString(new FaultClearVo(cvo.startTrueStopFalse ?(byte)1:0)) ;
        return msg ;
    }
    private MqttPubMsgSdV1 createPubMsgOfStir(String orgTag, Command com) throws Exception {
@@ -138,8 +147,8 @@
        msg.isCacheForOffLine = false ;
        msg.hasResponse = true ;
        msg.cd = CodeSdV1.cd_Fault ;
        msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
        msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ;
        msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicStir) ;
        msg.msg = JSON.toJSONString(new StirStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ;
        return msg ;
    }
    private MqttPubMsgSdV1 createPubMsgOfInject(String orgTag, Command com) throws Exception {
@@ -154,8 +163,8 @@
        msg.isCacheForOffLine = false ;
        msg.hasResponse = true ;
        msg.cd = CodeSdV1.cd_Fault ;
        msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
        msg.msg = JSON.toJSONString(new InjectStartVo(cvo.isDo)) ;
        msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicInject) ;
        msg.msg = JSON.toJSONString(new InjectStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ;
        return msg ;
    }
    private MqttPubMsgSdV1 createPubMsgOfIrr(String orgTag, Command com) throws Exception {
@@ -170,8 +179,25 @@
        msg.isCacheForOffLine = false ;
        msg.hasResponse = true ;
        msg.cd = CodeSdV1.cd_Fault ;
        msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
        msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ;
        msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicIrr) ;
        msg.msg = JSON.toJSONString(new StirStartVo(cvo.startTrueStopFalse ?(byte)1:0)) ;
        return msg ;
    }
    private MqttPubMsgSdV1 createPubMsgOfParam(String orgTag, Command com) throws Exception {
        JSONObject obj = (JSONObject) com.param;
        String json = obj.toJSONString();
        ComSetParamVo cvo = JSON.parseObject(json, ComSetParamVo.class);
        if(cvo == null){
            throw new Exception("json转ComSetParamVo为null") ;
        }
        MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ;
        this.setPubMsgBase(com, msg);
        msg.isCacheForOffLine = false ;
        msg.hasResponse = false ;
        msg.cd = CodeSdV1.cd_Param ;
        msg.topic = new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicParam) ;
        msg.msg = JSON.toJSONString(new ParamSetVo(cvo.stirDuration, cvo.injectDuration)) ;
        return msg ;
    }
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java
@@ -6,7 +6,7 @@
 * @Description
 */
public class ComCtrlVo {
    //是否控制动作,true是,false否
    //启停动作,true是,false否
    //可以执行功能码 00,01,02,03的动作
    public boolean isDo;//
    public boolean startTrueStopFalse;//
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComSetParamVo.java
New file
@@ -0,0 +1,23 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1.comParam;
import com.alibaba.fastjson2.annotation.JSONField;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
 * @Author: liurunyu
 * @Date: 2025/6/11 17:03
 * @Description
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ComSetParamVo {
    // 搅拌设定时间
    public Integer stirDuration ;
    // 注肥设定时间
    public Integer injectDuration ;
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java
@@ -16,10 +16,10 @@
@AllArgsConstructor
public class FaultClearVo implements Vo4Down {
    @JSONField(name = "故障解除")
    public boolean isDo ;
    public Byte isDo ;//1是,0否
    @Override
    public String toString(){
        return "故障解除:" + (isDo?"是":"否") ;
        return "故障解除:" + (isDo==1?"是":"否") ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java
@@ -16,9 +16,10 @@
@AllArgsConstructor
public class InjectStartVo implements Vo4Down {
    @JSONField(name = "注肥启停")
    public boolean isDo ;//true为启,false为停
    public Byte isDo ;//1是,0否
    @Override
    public String toString(){
        return "注肥启停:" + (isDo?"启":"停") ;
        return "注肥启停:" + (isDo==1?"启":"停") ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java
@@ -16,9 +16,10 @@
@AllArgsConstructor
public class IrrStartVo implements Vo4Down {
    @JSONField(name = "灌溉启停")
    public boolean isDo ;//true为启,false为停
    public Byte isDo ;//1是,0否
    @Override
    public String toString(){
        return "灌溉启停:" + (isDo?"启":"停") ;
        return "灌溉启停:" + (isDo==1?"是":"否") ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ParamSetVo.java
New file
@@ -0,0 +1,31 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
import com.alibaba.fastjson2.annotation.JSONField;
import com.dy.common.mw.protocol4Mqtt.Vo4Down;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
 * @Author: liurunyu
 * @Date: 2025/6/11 16:55
 * @Description
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ParamSetVo implements Vo4Down {
    @JSONField(name = "搅拌设定时间")
    public Integer stirDuration ;
    @JSONField(name = "注肥设定时间")
    public Integer injectDuration ;
    @Override
    public String toString(){
        StringBuilder sb = new StringBuilder();
        sb.append("搅拌设定时间:" + stirDuration + "\n" );
        sb.append("注肥设定时间:" + injectDuration + "\n" );
        return sb.toString();
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java
@@ -16,9 +16,10 @@
@AllArgsConstructor
public class StirStartVo implements Vo4Down {
    @JSONField(name = "搅拌启停")
    public boolean isDo ;//true为启,false为停
    public Byte isDo ;//1是,0否
    @Override
    public String toString(){
        return "搅拌启停:" + (isDo?"启":"停") ;
        return "搅拌启停:" + (isDo==1?"是":"否") ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java
@@ -12,6 +12,18 @@
 */
@Data
public class ManureVo implements Vo4Up {
    @JSONField(name = "flexem_message_id")
    public Integer messageId ;//消息ID
    @JSONField(name = "肥料流量")
    public Float manureFlow ;//肥料流量
    @JSONField(name = "注肥时长")
    public Integer manureTime ;//注肥时长
    @JSONField(name = "搅拌时长")
    public Integer stirTime ;//搅拌时长
    @JSONField(name = "flexem_timestamp")
    public Long devDt ;//设备时间
@@ -27,9 +39,13 @@
    @Override
    public String toString(){
        StringBuilder sb = new StringBuilder();
        sb.append("水肥数据:") ;
        sb.append(" 设备时间:"+devDt) ;
        sb.append(" 设备时间:"+ this.getDevDtStr()) ;
        sb.append("水肥数据=>") ;
        sb.append(" 消息ID:" + messageId + ", ") ;
        sb.append(" 肥料流量:" + manureFlow + ", ") ;
        sb.append(" 注肥时长:" + manureTime + ", ") ;
        sb.append(" 搅拌时长:" + stirTime + ", ") ;
        sb.append(" 设备时间:" + devDt + ", ") ;
        sb.append(" 设备时间:" +  this.getDevDtStr() + ", ") ;
        sb.append("\n") ;
        return sb.toString() ;
    }
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java
@@ -12,6 +12,33 @@
 */
@Data
public class SoilVo implements Vo4Up {
    @JSONField(name = "flexem_message_id")
    public Integer messageId ;//消息ID
    @JSONField(name = "土壤湿度1")
    public Float soilHumidity1 ;//土壤湿度1
    @JSONField(name = "土壤湿度2")
    public Float soilHumidity2 ;//土壤湿度2
    @JSONField(name = "土壤湿度3")
    public Float soilHumidity3 ;//土壤湿度3
    @JSONField(name = "土壤湿度4")
    public Float soilHumidity4 ;//土壤湿度4
    @JSONField(name = "土壤湿度1")
    public Float soilTemperature1 ;//土壤温度1
    @JSONField(name = "土壤温度2")
    public Float soilTemperature2 ;//土壤温度2
    @JSONField(name = "土壤温度3")
    public Float soilTemperature3 ;//土壤温度3
    @JSONField(name = "土壤温度4")
    public Float soilTemperature4 ;//土壤温度4
    @JSONField(name = "flexem_timestamp")
    public Long devDt ;//设备时间
@@ -27,10 +54,20 @@
    @Override
    public String toString(){
        StringBuilder sb = new StringBuilder();
        sb.append("墒情数据:") ;
        sb.append(" 设备时间:"+devDt) ;
        sb.append(" 设备时间:"+ this.getDevDtStr()) ;
        sb.append("墒情数据=>") ;
        sb.append(" 消息ID:" + messageId + ", ") ;
        sb.append(" 土壤湿度1:" + soilHumidity1 + ", ") ;
        sb.append(" 土壤湿度2:" + soilHumidity2 + ", ") ;
        sb.append(" 土壤湿度3:" + soilHumidity3 + ", ") ;
        sb.append(" 土壤湿度4:" + soilHumidity4 + ", ") ;
        sb.append(" 土壤温度1:" + soilTemperature1 + ", ") ;
        sb.append(" 土壤温度2:" + soilTemperature2 + ", ") ;
        sb.append(" 土壤温度3:" + soilTemperature3 + ", ") ;
        sb.append(" 土壤温度4:" + soilTemperature4 + ", ") ;
        sb.append(" 设备时间:" + devDt + ", ") ;
        sb.append(" 设备时间:" +  this.getDevDtStr() + ", ") ;
        sb.append("\n") ;
        return sb.toString() ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java
@@ -12,6 +12,21 @@
 */
@Data
public class StateVo implements Vo4Up {
    @JSONField(name = "flexem_message_id")
    public Integer messageId ;//消息ID
    @JSONField(name = "搅拌运行")
    public Byte stirRunning ;//搅拌运行
    @JSONField(name = "注肥运行")
    public Byte injectRunning ;//注肥运行
    @JSONField(name = "灌溉运行")
    public Byte irrRunning ;//灌溉运行
    @JSONField(name = "报警")
    public Byte alarm ;//报警
    @JSONField(name = "flexem_timestamp")
    public Long devDt ;//设备时间
@@ -27,9 +42,14 @@
    @Override
    public String toString(){
        StringBuilder sb = new StringBuilder();
        sb.append("状态数据:") ;
        sb.append(" 设备时间:"+devDt) ;
        sb.append(" 设备时间:"+ this.getDevDtStr()) ;
        sb.append("状态数据=>") ;
        sb.append(" 消息ID:" + messageId + ", ") ;
        sb.append(" 搅拌运行:" + stirRunning + ", ") ;
        sb.append(" 注肥运行:" + injectRunning + ", ") ;
        sb.append(" 灌溉运行:" + irrRunning + ", ") ;
        sb.append(" 报警:" + alarm + ", ") ;
        sb.append(" 设备时间:" + devDt + ", ") ;
        sb.append(" 设备时间:" +  this.getDevDtStr() + ", ") ;
        sb.append("\n") ;
        return sb.toString() ;
    }
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
@@ -54,16 +54,16 @@
    public String toString(){
        StringBuilder sb = new StringBuilder();
        sb.append("气象数据=>") ;
        sb.append(" 消息ID:"+messageId + ", ") ;
        sb.append(" 二氧化碳:"+carbonDioxide + ", ") ;
        sb.append(" 光照强度:"+lightIntensity + ", ") ;
        sb.append(" 大气压力:"+atmosphericPressure + ", ") ;
        sb.append(" 空气温度:"+airTemperature + ", ") ;
        sb.append(" 空气湿度:"+airHumidity + ", ") ;
        sb.append(" PM2.5:"+pm25 + ", ") ;
        sb.append(" PM10:"+pm10 + ", ") ;
        sb.append(" 设备时间:"+devDt + ", ") ;
        sb.append(" 设备时间:"+ this.getDevDtStr() + ", ") ;
        sb.append(" 消息ID:" + messageId + ", ") ;
        sb.append(" 二氧化碳:" + carbonDioxide + ", ") ;
        sb.append(" 光照强度:" + lightIntensity + ", ") ;
        sb.append(" 大气压力:" + atmosphericPressure + ", ") ;
        sb.append(" 空气温度:" + airTemperature + ", ") ;
        sb.append(" 空气湿度:" + airHumidity + ", ") ;
        sb.append(" PM2.5:" + pm25 + ", ") ;
        sb.append(" PM10:" + pm10 + ", ") ;
        sb.append(" 设备时间:" + devDt + ", ") ;
        sb.append(" 设备时间:" +  this.getDevDtStr() + ", ") ;
        sb.append("\n") ;
        return sb.toString() ;
    }
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineInfo.java
File was renamed from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java
@@ -9,7 +9,7 @@
 * @Description
 */
@Data
public class DevOnLineSt implements MqttNotifyInfo {
public class DevOnLineInfo implements MqttNotifyInfo {
    public String id ;
    public String protocol ;
    public Boolean onLine ;
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunInfo.java
File was renamed from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java
@@ -9,8 +9,8 @@
 * @Description
 */
@Data
public class DevRunSt implements MqttNotifyInfo {
    public String id ;
public class DevRunInfo implements MqttNotifyInfo {
    public String devId ;//MQTT设置ID
    public Boolean stirRunning ;//搅拌运行 true是 false否
    public Boolean injectRunning ;//注肥运行 true是 false否
    public Boolean irrRunning ;//灌溉运行 true是 false否
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
@@ -1,6 +1,6 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.mw.protocol4Mqtt.status.DevRunSt;
import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo;
import com.dy.rtuMw.server.forTcp.RtuLogDealer;
import com.dy.rtuMw.server.local.localProtocol.RtuOnLineStateStatisticsVo;
@@ -166,7 +166,7 @@
        }
    }
    public static void setStatus(String devId, DevRunSt st){
    public static void setStatus(String devId, DevRunInfo st){
        DevStatus vo = map.get(devId) ;
        if(vo != null) {
            if(st.stirRunning != null){
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
@@ -3,8 +3,8 @@
import com.dy.common.mw.channel.mqtt.MqttClientPool;
import com.dy.common.mw.protocol4Mqtt.MqttNotify;
import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo;
import com.dy.common.mw.protocol4Mqtt.status.DevOnLineSt;
import com.dy.common.mw.protocol4Mqtt.status.DevRunSt;
import com.dy.common.mw.protocol4Mqtt.status.DevOnLineInfo;
import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo;
import com.dy.rtuMw.server.ServerProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -77,15 +77,15 @@
                            public void notify(String devId, MqttNotifyInfo... infos) {
                                if(devId != null && infos != null && infos.length > 0){
                                    for(MqttNotifyInfo info : infos){
                                        if(info instanceof DevOnLineSt){
                                            DevOnLineSt onLineSt = (DevOnLineSt)info;
                                        if(info instanceof DevOnLineInfo){
                                            DevOnLineInfo onLineSt = (DevOnLineInfo)info;
                                            if(onLineSt.onLine != null && onLineSt.onLine.booleanValue()){
                                                DevStatusDealer.onLine(devId, ((DevOnLineSt)info).protocol);
                                                DevStatusDealer.onLine(devId, ((DevOnLineInfo)info).protocol);
                                            }else{
                                                DevStatusDealer.offLine(devId);
                                            }
                                        } else if(info instanceof DevRunSt){
                                            DevStatusDealer.setStatus(devId, (DevRunSt)info);
                                        } else if(info instanceof DevRunInfo){
                                            DevStatusDealer.setStatus(devId, (DevRunInfo)info);
                                        }
                                    }
                                }
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
@@ -45,33 +45,6 @@
        }
    }
    private void nextDeal(MqttSubMsg subMsg)throws Exception {
        subMsg.action(new Callback() {
            @Override
            public void call(Object obj) {
                MqttSubMsg subMs = (MqttSubMsg) obj ;
                MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ;
                if(pubMs != null){
                    //匹配到下行消息(命令)
                    subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ;
                    subMs.commandId = pubMs.commandId ;
                    try {
                        MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs));
                    } catch (Exception e) {
                        log.error("缓存发布消息(命令)结果发生异常", e);
                    }
                }
                try{
                    MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg));
                }catch (Exception e){
                    log.error("缓存订阅消息数据发生异常", e);
                }
            }
            @Override
            public void call(Object... objs) {
            }
            @Override
            public void exception(Exception e) {
            }
        });
        subMsg.action(new MqttSubMsgDealer());
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java
@@ -86,8 +86,7 @@
        while(node != null && node.obj != null){
            obj = (MqttPubMsgNode)node.obj;
            pubMsg = obj.result ;
            if(pubMsg != null
                    && subMsg.subMsgMatchPubMsg(pubMsg)){
            if(pubMsg != null && subMsg.subMsgMatchPubMsg(pubMsg)){
                obj.onceReceivedResult = true ;//标识已经收到命令结果
                return pubMsg;
            }else{
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
@@ -65,7 +65,7 @@
        }else{
            if(mqttClient != null && mqttClient.isConnected()){
                try {
                    mqttManager.publishMsg(mqttClient, this.result.topic, this.result.msg);
                    mqttManager.publishMsg(mqttClient, this.result.topic.longName(), this.result.msg);
                    DevStatusDealer.afterSendPubMessage(this.result.deviceId);
                    RtuLogDealer.log4Mqtt(this.result.deviceId, "发布消息    主题:" + this.result.topic + "   消息:" + this.result.msg);
                    log.info("发布MQTT消息(主题=" + this.result.topic + ")" + this.result.msg);
@@ -74,7 +74,11 @@
                }finally {
                    mqttManager.pushMqttClient(mqttClient);
                }
                return false ;
                if(this.result.hasResponse){
                    return false ;
                }else{
                    return true ;
                }
            }else{
                //未曾连接MQTT服务器
                return this.decideRemoveNodeFromCach(now) ;
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgDealer.java
New file
@@ -0,0 +1,41 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
import com.dy.common.util.Callback;
import lombok.extern.slf4j.Slf4j;
/**
 * @Author: liurunyu
 * @Date: 2025/6/11 17:33
 * @Description
 */
@Slf4j
public class MqttSubMsgDealer implements Callback {
    @Override
    public void call(Object obj) {
        MqttSubMsg subMs = (MqttSubMsg) obj ;
        MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ;
        if(pubMs != null){
            //匹配到下行消息(命令)
            subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ;
            subMs.commandId = pubMs.commandId ;
            try {
                MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs));
            } catch (Exception e) {
                log.error("缓存发布消息(命令)结果发生异常", e);
            }
        }
        try{
            MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMs));
        }catch (Exception e){
            log.error("缓存订阅消息数据发生异常", e);
        }
    }
    @Override
    public void call(Object... objs) {
    }
    @Override
    public void exception(Exception e) {
    }
}