liurunyu
3 天以前 6b828ba1310db528aa8172bd14a0253ebca5a844
基于mqtt的水肥机、气象站、墒情站协议、功能模块继续开发
2 文件已重命名
18个文件已添加
19个文件已修改
1315 ■■■■ 已修改文件
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCallback.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCom.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotify.java 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotifyInfo.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Down.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Up.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java 131 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/说明.txt 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/softUpgrade/state/UpgradeRtu.java 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/RtuLogDealer.java 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java 197 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatus.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java 186 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCallback.java
New file
@@ -0,0 +1,19 @@
package com.dy.common.mw.protocol4Mqtt;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 15:39
 * @Description
 */
public interface MqttCallback {
    /**
     * @param subMsg è®¢é˜…的消息
     */
    void callback(MqttSubMsg subMsg) ;
    /**
     * åªæœ‰åè®®è§£æžå™¨æ‰çŸ¥é“RTU真实的状态,所认提供此接口,向外通知设备的一些状态
     * @param infos
     */
    void notify(String devId, MqttNotifyInfo...infos) ;
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttCom.java
File was renamed from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java
@@ -8,7 +8,7 @@
 * @Description å‘½ä»¤å€¼å¯¹è±¡
 */
@Data
public class Com4Mqtt {
public class MqttCom {
    public String commandId ;//命令ID
    public String deviceId ;//设备ID
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java
@@ -11,25 +11,38 @@
 * @Description
 */
public class MqttMsgParser {
    public MqttSubMsg parseSubMsg(String topic, MqttMessage mqttMsg) throws Exception {
    public static MqttTopic parseSubTopic(String topic) throws Exception {
        if(topic != null && topic.trim().length() != 0){
            String[] topicGrp = topic.split("/") ;
            if(topicGrp.length != 4){
                throw new Exception("接收的mqtt消息主题不可识别") ;
            }else{
                if(topicGrp[1].equals("sd1")){
                    //山东设备(协议),且版本号为1
                    return new ProtocolParserSdV1().parseSubMsg(topicGrp[2], topic, mqttMsg);
                }else{
                    throw new Exception("接收的mqtt消息主题中协议(厂家及版本)不可识别") ;
                }
                MqttTopic vo = new MqttTopic() ;
                vo.orgTag = topicGrp[0] ;
                vo.protocol = topicGrp[1] ;
                vo.devId = topicGrp[2] ;
                vo.topic = topicGrp[3] ;
                return vo ;
            }
        }else{
            throw new Exception("接收的mqtt消息主题为空") ;
            throw new Exception("接收的mqtt消息主题不合法") ;
        }
    }
    public MqttPubMsg createPubMsg(String orgTag, Command com) throws Exception {
    public static String createPubTopic(MqttTopic tp) throws Exception {
        return tp.orgTag + "/" + tp.protocol + "/" + tp.devId + "/" + tp.topic ;
    }
    public static MqttSubMsg parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception {
        if(subTopic.protocol.equals(ProtocolConstantSdV1.protocolName + ProtocolConstantSdV1.protocolVer)){
            //山东设备(协议),且版本号为1
            return new ProtocolParserSdV1().parseSubMsg(subTopic, mqttMsg, callback);
        }else{
            throw new Exception("接收的mqtt消息主题中协议(厂家及版本)不可识别") ;
        }
    }
    public static MqttPubMsg createPubMsg(String orgTag, Command com) throws Exception {
        if(com.protocol == null && com.protocol.trim().length() != 0){
            throw new Exception("接收到MQTT命令,但未提供协议") ;
        }
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotify.java
New file
@@ -0,0 +1,16 @@
package com.dy.common.mw.protocol4Mqtt;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 15:56
 * @Description
 */
public interface MqttNotify {
    /**
     * MQTT DEV ä¿¡æ¯é€šçŸ¥
     * @param devId
     * @param info
     */
    void notify(String devId,
                MqttNotifyInfo...info) ;
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttNotifyInfo.java
New file
@@ -0,0 +1,9 @@
package com.dy.common.mw.protocol4Mqtt;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 15:42
 * @Description
 */
public interface MqttNotifyInfo {
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
@@ -10,10 +10,10 @@
public abstract class MqttSubMsg {
    public String commandId ;//命令ID
    public String mqttResultSendWebUrl ;//Mtt返回命令结果 å‘向目的地web URL
    public String deviceId ;//设备ID
    public String mqttResultSendWebUrl ;//Mtt返回命令结果 å‘向目的地web URL
    public String protocol;//协议
    public String topic ;//消息主题
    public String msg ;//消息
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttTopic.java
New file
@@ -0,0 +1,21 @@
package com.dy.common.mw.protocol4Mqtt;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 9:47
 * @Description
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MqttTopic {
    public String orgTag ;//组织标识
    public String protocol ;//协议名称
    public String devId ;//设备(FBox)ID
    public String topic ;//消息主题
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Down.java
New file
@@ -0,0 +1,10 @@
package com.dy.common.mw.protocol4Mqtt;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 14:11
 * @Description
 */
public interface Vo4Down {
    String toString() ;
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Vo4Up.java
New file
@@ -0,0 +1,10 @@
package com.dy.common.mw.protocol4Mqtt;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 10:06
 * @Description
 */
public interface Vo4Up {
    String toString() ;
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java
@@ -13,8 +13,7 @@
@EqualsAndHashCode(callSuper=false)
public class MqttPubMsgSdV1 extends MqttPubMsg {
    public Integer address ;//寄存器地址
    public String value ;//寄存器值
    public String cd ;//功能码
    @Override
    public boolean valid() {
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
@@ -2,6 +2,9 @@
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
import com.dy.common.mw.protocol4Mqtt.MqttTopic;
import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.StateVo;
import com.dy.common.mw.protocol4Mqtt.Vo4Up;
import com.dy.common.util.Callback;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -14,14 +17,14 @@
@Data
@EqualsAndHashCode(callSuper=false)
public class MqttSubMsgSdV1 extends MqttSubMsg {
    public Integer address ;//寄存器地址
    public String value ;//寄存器值
    public Vo4Up vo4Up;//订阅的消息数据值对象
    public MqttSubMsgSdV1(){}
    public MqttSubMsgSdV1(String deviceId, String topic, String msg) {
        this.deviceId = deviceId ;
        this.topic = topic ;
    public MqttSubMsgSdV1(MqttTopic subTopic, String msg) {
        this.deviceId = subTopic.devId ;
        this.protocol = subTopic.protocol ;
        this.topic = subTopic.topic ;
        this.msg = msg ;
    }
    public String toString(){
@@ -37,6 +40,11 @@
        sb.append("消息:")
                .append(msg)
                .append("\n") ;
        if(vo4Up != null){
            sb.append("数据:")
                    .append(vo4Up.toString())
                    .append("\n") ;
        }
        return sb.toString() ;
    }
@@ -44,7 +52,7 @@
    public boolean subMsgMatchPubMsg(MqttPubMsg pubMsg){
        if (pubMsg instanceof MqttPubMsgSdV1) {
            MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg;
            if(this.address.intValue() == pubMsgSdV1.getAddress().intValue()){
            if(this.vo4Up != null && this.vo4Up instanceof StateVo){
                return true ;
            }
        }
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java
@@ -8,4 +8,17 @@
public class ProtocolConstantSdV1 {
    public static final String protocolName = "sd" ;
    public static final short protocolVer = 1 ;
    //订阅的主题
    public static final String SubTopicWeather = "weather" ;//气象
    public static final String SubTopicSoil = "soil" ;//土壤墒情
    public static final String SubTopicManure = "manure" ;//水肥
    public static final String SubTopicState = "state" ;//状态
    //发布的主题
    public static final String PubTopicFault = "ctrlFault" ;//故障解除
    public static final String PubTopicStir = "ctrlStir" ;//搅拌启停
    public static final String PubTopicInject = "ctrlInject" ;//注肥启停
    public static final String PubTopicIrr = "ctrlIrr" ;//灌溉启停
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
@@ -3,9 +3,16 @@
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.dy.common.mw.protocol.Command;
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.ComCtrlVo;
import com.dy.common.mw.protocol4Mqtt.MqttCallback;
import com.dy.common.mw.protocol4Mqtt.MqttMsgParser;
import com.dy.common.mw.protocol4Mqtt.MqttTopic;
import com.dy.common.mw.protocol4Mqtt.Vo4Up;
import com.dy.common.mw.protocol4Mqtt.pSdV1.comParam.ComCtrlVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.FaultClearVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.InjectStartVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.StirStartVo;
import com.dy.common.mw.protocol4Mqtt.pSdV1.upVos.*;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
 * @Author: liurunyu
@@ -13,44 +20,74 @@
 * @Description
 */
public class ProtocolParserSdV1 {
    public MqttSubMsgSdV1 parseSubMsg(String deviceId, String topic, MqttMessage mqttMsg) throws Exception {
        MqttSubMsgSdV1 ms = new MqttSubMsgSdV1(deviceId, topic, new String(mqttMsg.getPayload(), "UTF-8"));
    public MqttSubMsgSdV1 parseSubMsg(MqttTopic subTopic, MqttMessage mqttMsg, MqttCallback callback) throws Exception {
        String msg = new String(mqttMsg.getPayload(), "UTF-8");
        if(JSON.isValid(msg)){
            throw new Exception("接收到MQTT消息,协议" + subTopic.protocol + ",设备ID" + subTopic.devId + ",主题" + subTopic.topic + "消息格式非json数据(" + msg + ")") ;
        }
        MqttSubMsgSdV1 ms = new MqttSubMsgSdV1(subTopic, msg);
        Vo4Up vo ;
        switch (subTopic.topic) {
            case ProtocolConstantSdV1.SubTopicWeather -> {
                vo = JSON.parseObject(msg, WeatherVo.class);
                break;
            }
            case ProtocolConstantSdV1.SubTopicSoil -> {
                vo = JSON.parseObject(msg, SoilVo.class);
                break;
            }
            case ProtocolConstantSdV1.SubTopicManure -> {
                vo = JSON.parseObject(msg, ManureVo.class);
                break;
            }
            case ProtocolConstantSdV1.SubTopicState -> {
                //此处未完成,应该产生一些通信的info,供下面callback.notify(objs)通知出去
                vo = JSON.parseObject(msg, StateVo.class);
                break;
            }
            default -> {
                throw new Exception("接收到MQTT消息,协议" + subTopic.protocol + ",设备ID" + subTopic.devId + ",主题" + subTopic.topic + "消息解析逻辑未实现");
            }
        }
        ms.vo4Up = vo ;
        callback.callback(ms);
        callback.notify(null);//此处未完成
        return ms;
    }
    public MqttPubMsgSdV1 createPubMsg(String orgTag, Command com) throws Exception {
        MqttPubMsgSdV1 msg = null ;
        switch (com.code){
            case CodeSdV1.cd_Fault:{
        MqttPubMsgSdV1 msg ;
        switch (com.code) {
            case CodeSdV1.cd_Fault -> {
                //故障解除命令
                this.checkParam(com);
                this.checkRtnWebUrl(com);
                msg = this.createPubMsgOfFault(orgTag, com) ;
                break ;
                msg = this.createPubMsgOfFault(orgTag, com);
                break;
            }
            case CodeSdV1.cd_Stir:{
            case CodeSdV1.cd_Stir -> {
                //搅拌启停命令
                this.checkParam(com);
                this.checkRtnWebUrl(com);
                msg = this.createPubMsgOfStir(orgTag, com) ;
                break ;
                msg = this.createPubMsgOfStir(orgTag, com);
                break;
            }
            case CodeSdV1.cd_Inject:{
            case CodeSdV1.cd_Inject -> {
                //注肥启停命令
                this.checkParam(com);
                this.checkRtnWebUrl(com);
                msg = this.createPubMsgOfInject(orgTag, com) ;
                break ;
                msg = this.createPubMsgOfInject(orgTag, com);
                break;
            }
            case CodeSdV1.cd_Irr:{
            case CodeSdV1.cd_Irr -> {
                //灌溉启停命令
                this.checkParam(com);
                this.checkRtnWebUrl(com);
                msg = this.createPubMsgOfIrr(orgTag, com) ;
                break ;
                msg = this.createPubMsgOfIrr(orgTag, com);
                break;
            }
            default:{
                throw new Exception("接收到MQTT命令,协议" + com.protocol + "版本" + com.protocolVersion + "功能码" + com.code + "构造器未实现") ;
            default -> {
                throw new Exception("接收到MQTT命令,协议" + com.protocol + "版本" + com.protocolVersion + "功能码" + com.code + "构造器未实现");
            }
        }
        return msg ;
@@ -76,10 +113,9 @@
        this.setPubMsgBase(com, msg);
        msg.isCacheForOffLine = false ;
        msg.hasResponse = true ;
        msg.address = 123 ;
        msg.value = "" + (cvo.isDo?1:0);
        msg.topic = createTopic(orgTag, com) ;
        msg.msg = "" ;
        msg.cd = CodeSdV1.cd_Fault ;
        msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
        msg.msg = JSON.toJSONString(new FaultClearVo(cvo.isDo)) ;
        return msg ;
    }
    private MqttPubMsgSdV1 createPubMsgOfStir(String orgTag, Command com) throws Exception {
@@ -93,10 +129,9 @@
        this.setPubMsgBase(com, msg);
        msg.isCacheForOffLine = false ;
        msg.hasResponse = true ;
        msg.address = 123 ;
        msg.value = "" + (cvo.isDo?1:0);
        msg.topic = createTopic(orgTag, com) ;
        msg.msg = "" ;
        msg.cd = CodeSdV1.cd_Fault ;
        msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
        msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ;
        return msg ;
    }
    private MqttPubMsgSdV1 createPubMsgOfInject(String orgTag, Command com) throws Exception {
@@ -110,10 +145,9 @@
        this.setPubMsgBase(com, msg);
        msg.isCacheForOffLine = false ;
        msg.hasResponse = true ;
        msg.address = 123 ;
        msg.value = "" + (cvo.isDo?1:0);
        msg.topic = createTopic(orgTag, com) ;
        msg.msg = "" ;
        msg.cd = CodeSdV1.cd_Fault ;
        msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
        msg.msg = JSON.toJSONString(new InjectStartVo(cvo.isDo)) ;
        return msg ;
    }
    private MqttPubMsgSdV1 createPubMsgOfIrr(String orgTag, Command com) throws Exception {
@@ -127,10 +161,9 @@
        this.setPubMsgBase(com, msg);
        msg.isCacheForOffLine = false ;
        msg.hasResponse = true ;
        msg.address = 123 ;
        msg.value = "" + (cvo.isDo?1:0);
        msg.topic = createTopic(orgTag, com) ;
        msg.msg = "" ;
        msg.cd = CodeSdV1.cd_Fault ;
        msg.topic = MqttMsgParser.createPubTopic(new MqttTopic(orgTag, com.protocol, com.rtuAddr, ProtocolConstantSdV1.PubTopicFault)) ;
        msg.msg = JSON.toJSONString(new StirStartVo(cvo.isDo)) ;
        return msg ;
    }
@@ -140,31 +173,5 @@
        msg.mqttResultSendWebUrl = com.rtuResultSendWebUrl ;
    }
    private String createTopic(String orgTag, Command com){
        String topic = null ;
        switch (com.code){
            case CodeSdV1.cd_Fault:{
                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m4" ;
                break ;
            }
            case CodeSdV1.cd_Stir:{
                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m80" ;
                break ;
            }
            case CodeSdV1.cd_Inject:{
                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m1" ;
                break ;
            }
            case CodeSdV1.cd_Irr:{
                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m2" ;
                break ;
            }
            default:{
                topic = null ;
                break;
            }
        }
        return topic ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/comParam/ComCtrlVo.java
File was renamed from pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java
@@ -1,4 +1,4 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
package com.dy.common.mw.protocol4Mqtt.pSdV1.comParam;
/**
 * @Author: liurunyu
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/FaultClearVo.java
New file
@@ -0,0 +1,25 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
import com.alibaba.fastjson2.annotation.JSONField;
import com.dy.common.mw.protocol4Mqtt.Vo4Down;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 14:07
 * @Description
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class FaultClearVo implements Vo4Down {
    @JSONField(name = "故障解除")
    public boolean isDo ;
    @Override
    public String toString(){
        return "故障解除:" + (isDo?"是":"否") ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/InjectStartVo.java
New file
@@ -0,0 +1,24 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
import com.alibaba.fastjson2.annotation.JSONField;
import com.dy.common.mw.protocol4Mqtt.Vo4Down;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 14:13
 * @Description
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class InjectStartVo implements Vo4Down {
    @JSONField(name = "注肥启停")
    public boolean isDo ;//true为启,false为停
    @Override
    public String toString(){
        return "注肥启停:" + (isDo?"启":"停") ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/IrrStartVo.java
New file
@@ -0,0 +1,24 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
import com.alibaba.fastjson2.annotation.JSONField;
import com.dy.common.mw.protocol4Mqtt.Vo4Down;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 14:13
 * @Description
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class IrrStartVo implements Vo4Down {
    @JSONField(name = "灌溉启停")
    public boolean isDo ;//true为启,false为停
    @Override
    public String toString(){
        return "灌溉启停:" + (isDo?"启":"停") ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/StirStartVo.java
New file
@@ -0,0 +1,24 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
import com.alibaba.fastjson2.annotation.JSONField;
import com.dy.common.mw.protocol4Mqtt.Vo4Down;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 14:13
 * @Description
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class StirStartVo implements Vo4Down {
    @JSONField(name = "搅拌启停")
    public boolean isDo ;//true为启,false为停
    @Override
    public String toString(){
        return "搅拌启停:" + (isDo?"启":"停") ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/ManureVo.java
New file
@@ -0,0 +1,36 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1.upVos;
import com.alibaba.fastjson2.annotation.JSONField;
import com.dy.common.mw.protocol4Mqtt.Vo4Up;
import com.dy.common.util.DateTime;
import lombok.Data;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 10:05
 * @Description
 */
@Data
public class ManureVo implements Vo4Up {
    @JSONField(name = "flexem_timestamp")
    public Long devDt ;//设备时间
    public String devDtStr ;//设备时间
    public String getDevDtStr() {
        if(devDt == null){
            return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ;
        }else{
            return "" ;
        }
    }
    @Override
    public String toString(){
        StringBuilder sb = new StringBuilder();
        sb.append("水肥数据:") ;
        sb.append(" è®¾å¤‡æ—¶é—´ï¼š"+devDt) ;
        sb.append(" è®¾å¤‡æ—¶é—´ï¼š"+ this.getDevDtStr()) ;
        sb.append("\n") ;
        return sb.toString() ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/SoilVo.java
New file
@@ -0,0 +1,36 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1.upVos;
import com.alibaba.fastjson2.annotation.JSONField;
import com.dy.common.mw.protocol4Mqtt.Vo4Up;
import com.dy.common.util.DateTime;
import lombok.Data;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 10:05
 * @Description
 */
@Data
public class SoilVo implements Vo4Up {
    @JSONField(name = "flexem_timestamp")
    public Long devDt ;//设备时间
    public String devDtStr ;//设备时间
    public String getDevDtStr() {
        if(devDt == null){
            return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ;
        }else{
            return "" ;
        }
    }
    @Override
    public String toString(){
        StringBuilder sb = new StringBuilder();
        sb.append("墒情数据:") ;
        sb.append(" è®¾å¤‡æ—¶é—´ï¼š"+devDt) ;
        sb.append(" è®¾å¤‡æ—¶é—´ï¼š"+ this.getDevDtStr()) ;
        sb.append("\n") ;
        return sb.toString() ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/StateVo.java
New file
@@ -0,0 +1,36 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1.upVos;
import com.alibaba.fastjson2.annotation.JSONField;
import com.dy.common.mw.protocol4Mqtt.Vo4Up;
import com.dy.common.util.DateTime;
import lombok.Data;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 10:05
 * @Description
 */
@Data
public class StateVo implements Vo4Up {
    @JSONField(name = "flexem_timestamp")
    public Long devDt ;//设备时间
    public String devDtStr ;//设备时间
    public String getDevDtStr() {
        if(devDt == null){
            return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ;
        }else{
            return "" ;
        }
    }
    @Override
    public String toString(){
        StringBuilder sb = new StringBuilder();
        sb.append("状态数据:") ;
        sb.append(" è®¾å¤‡æ—¶é—´ï¼š"+devDt) ;
        sb.append(" è®¾å¤‡æ—¶é—´ï¼š"+ this.getDevDtStr()) ;
        sb.append("\n") ;
        return sb.toString() ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/upVos/WeatherVo.java
New file
@@ -0,0 +1,70 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1.upVos;
import com.alibaba.fastjson2.annotation.JSONField;
import com.dy.common.mw.protocol4Mqtt.Vo4Up;
import com.dy.common.util.DateTime;
import lombok.Data;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 10:04
 * @Description
 */
@Data
public class WeatherVo implements Vo4Up {
    //{"PM10":10,"PM2.5":0,"flexem_message_id":1311,"flexem_timestamp":1749522958,"二氧化碳":10,"光照强度":0,"大气压力":20,"空气温度":0,"空气湿度":65}
    @JSONField(name = "flexem_message_id")
    public Integer messageId ;//消息ID
    @JSONField(name = "二氧化碳")
    public Integer carbonDioxide ;//二氧化碳
    @JSONField(name = "光照强度")
    public Integer lightIntensity ;//光照强度
    @JSONField(name = "大气压力")
    public Integer atmosphericPressure ;//大气压力
    @JSONField(name = "空气温度")
    public Integer airTemperature ;//空气温度
    @JSONField(name = "空气湿度")
    public Integer airHumidity ;//空气湿度
    @JSONField(name = "PM2.5")
    public Integer pm25 ;//PM2.5
    @JSONField(name = "PM10")
    public Integer pm10 ;//PM10
    @JSONField(name = "flexem_timestamp")
    public Long devDt ;//设备时间
    public String devDtStr ;//设备时间
    public String getDevDtStr() {
        if(devDt == null){
            return DateTime.yyyy_MM_dd_HH_mm_ss(DateTime.getDate(devDt)) ;
        }else{
            return "" ;
        }
    }
    @Override
    public String toString(){
        StringBuilder sb = new StringBuilder();
        sb.append("气象数据:") ;
        sb.append(" æ¶ˆæ¯ID:"+messageId) ;
        sb.append(" äºŒæ°§åŒ–碳:"+carbonDioxide) ;
        sb.append(" å…‰ç…§å¼ºåº¦ï¼š"+lightIntensity) ;
        sb.append(" å¤§æ°”压力:"+atmosphericPressure) ;
        sb.append(" ç©ºæ°”温度:"+airTemperature) ;
        sb.append(" ç©ºæ°”湿度:"+airHumidity) ;
        sb.append(" PM2.5:"+pm25) ;
        sb.append(" PM10:"+pm10) ;
        sb.append(" è®¾å¤‡æ—¶é—´ï¼š"+devDt) ;
        sb.append(" è®¾å¤‡æ—¶é—´ï¼š"+ this.getDevDtStr()) ;
        sb.append("\n") ;
        return sb.toString() ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/˵Ã÷.txt
@@ -1,6 +1,6 @@
山东泰安公司提供水肥机、土壤墒情站、气象站、FBox系统协议
建议消息主题规则:
子系统(机构)/协议名称(厂家)+版本号/设备编号/功能组/地址
子系统(机构)/协议名称(厂家)+版本号/设备编号/功能组
例如:
ym/sd1/10000/control/m4  (元谋/山东+版本1/设备编号/设备控制/地址)
ym/sd1/10000/weather  (元谋/山东+版本1/设备编号/气象)
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevOnLineSt.java
New file
@@ -0,0 +1,16 @@
package com.dy.common.mw.protocol4Mqtt.status;
import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo;
import lombok.Data;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 15:50
 * @Description
 */
@Data
public class DevOnLineSt implements MqttNotifyInfo {
    public String id ;
    public String protocol ;
    public Boolean onLine ;
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/status/DevRunSt.java
New file
@@ -0,0 +1,18 @@
package com.dy.common.mw.protocol4Mqtt.status;
import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo;
import lombok.Data;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 15:52
 * @Description
 */
@Data
public class DevRunSt implements MqttNotifyInfo {
    public String id ;
    public Boolean stirRunning ;//搅拌运行 true是 false否
    public Boolean injectRunning ;//注肥运行 true是 false否
    public Boolean irrRunning ;//灌溉运行 true是 false否
    public Boolean alarm ;//报警 true是 false否
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/softUpgrade/state/UpgradeRtu.java
@@ -60,30 +60,19 @@
    }
    public static String getStateName(int state){
        switch (state) {
            case STATE_OPEN:
                return "阀开";
            case STATE_OFFLINE:
                return "离线";
            case STATE_UNSTART:
                return "未开始";
            case STATE_RUNNING:
                return "升级中";
            case STATE_SUCCESS:
                return "升级成功";
            case STATE_FAILONE:
                return "一包死";
            case STATE_FAIL:
                return "多包死";
            case STATE_FAILOFFLINE:
                return "离线失败";
            case STATE_FAILOPEN:
                return "阀开失败";
            case STATE_FAILRTU:
                return "RTU失败";
            default:
                return "未知";
        }
        return switch (state) {
            case STATE_OPEN -> "阀开";
            case STATE_OFFLINE -> "离线";
            case STATE_UNSTART -> "未开始";
            case STATE_RUNNING -> "升级中";
            case STATE_SUCCESS -> "升级成功";
            case STATE_FAILONE -> "一包死";
            case STATE_FAIL -> "多包死";
            case STATE_FAILOFFLINE -> "离线失败";
            case STATE_FAILOPEN -> "阀开失败";
            case STATE_FAILRTU -> "RTU失败";
            default -> "未知";
        };
    }
    /**
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
@@ -111,12 +111,12 @@
            }catch(Exception e){
                company = "" ;
            }
            System.out.println("OOOOOOOOOO           OOOOOOOO       OOOOOOOO") ;
            System.out.println("@@@@@@@@@@@@@@@@#O    $@@@@@@@@&    @@@@@@@@#") ;
            System.out.println("@@@@@@@@@@@@@@@@@@@#    @@@@@@@@# $@@@@@@@@&") ;
            System.out.println("@@@@@@@@@@@@@@@@@@@@@#   #@@@@@@@@@@@@@@@@O") ;
            System.out.println("@@@@@@@@@@@@@@@@@@@@@@@   &@@@@@@@@@@@@@@") ;
            System.out.println("@@@@@@$      $@@@@@@@@@&   O@@@@@@@@@@@#") ;
            System.out.println("$$$$$$$$$$$$         $$$$$$$$       $$$$$$$$") ;
            System.out.println("@@@@@@@@@@@@@@@@#$    $@@@@@@@@&    @@@@@@@@#") ;
            System.out.println("@@@@@@@@@@@@@@@@@@@#    @@@@@@@@# $@@@@@@@@&") ;
            System.out.println("@@@@@@@@@@@@@@@@@@@@@#   #@@@@@@@@@@@@@@@@$") ;
            System.out.println("@@@@@@@@@@@@@@@@@@@@@@@   &@@@@@@@@@@@@@@") ;
            System.out.println("@@@@@@$      $@@@@@@@@@&   $@@@@@@@@@@@#") ;
            System.out.println("@@@@@@$        @@@@@@@@@     @@@@@@@@@&      " + this.orgTag + svName + "RtuMw 1.0.00" ) ;
            if(this.HttpSvPath != null && this.HttpSvPort != null){
                System.out.println("@@@@@@$       O@@@@@@@@@     &@@@@@@@@       HttpSv [ip]:" + this.HttpSvPort + this.HttpSvPath) ;
@@ -450,7 +450,7 @@
            mqVo.enable = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ;
            ServerProperties.mqttUnitEnable = mqVo.enable ;
            if(mqVo.enable){
                mqVo.svIp = conf.getSetAttrTxt(doc, "config.mqtt", "svIp", null, true, null) ;
                mqVo.svIp = conf.getSetAttrTxt(doc, "config.mqtt", "svIp", null, false, null) ;
                if(!IPUtils.ipValid(mqVo.svIp)){
                    throw new Exception("config.mqtt.svIp配置的IP不合法") ;
                }
@@ -458,13 +458,13 @@
                if(mqVo.svPort < 0 || mqVo.svPort > 65535){
                    throw new Exception("config.mqtt.svPort配置的端口不合法") ;
                }
                mqVo.svUserName = conf.getSetAttrTxt(doc, "config.mqtt", "svUserName", null, true, null) ;
                mqVo.svUserName = conf.getSetAttrTxt(doc, "config.mqtt", "svUserName", null, false, null) ;
                if(mqVo.svUserName == null || mqVo.svUserName.trim().equals("")){
                    throw new Exception("config.mqtt.svUserName配置的用户名不合法") ;
                }else{
                    mqVo.svUserName = mqVo.svUserName.trim() ;
                }
                mqVo.svUserPassword = conf.getSetAttrTxt(doc, "config.mqtt", "svUserPassword", null, true, null) ;
                mqVo.svUserPassword = conf.getSetAttrTxt(doc, "config.mqtt", "svUserPassword", null, false, null) ;
                if(mqVo.svUserPassword == null || mqVo.svUserPassword.trim().equals("")){
                    throw new Exception("config.mqtt.svUserName配置的用户密码不合法") ;
                }else{
@@ -474,28 +474,51 @@
                if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){
                    throw new Exception("config.mqtt.poolMaxSize配置的连接池连接最大数量不合法") ;
                }
                String topicAndQos = conf.getSetAttrTxt(doc, "config.mqtt", "topicAndQos", null, true, null) ;
                if(topicAndQos == null || topicAndQos.trim().equals("")){
                    throw new Exception("config.mqtt.topicAndQos配置的主题及Qos不合法") ;
                String proAndDevIds = conf.getSetAttrTxt(doc, "config.mqtt", "protocolAndDeviceIds", null, false, null) ;
                if(proAndDevIds == null || proAndDevIds.trim().equals("")){
                    throw new Exception("config.mqtt.protocolAndDeviceIds配置不合法") ;
                }else{
                    topicAndQos = topicAndQos.trim() ;
                    topicAndQos = topicAndQos.replaceAll(",", ",");
                    topicAndQos = topicAndQos.replaceAll(";", ";");
                    String[] topicAndQosArr = topicAndQos.split(";") ;
                    proAndDevIds = proAndDevIds.trim() ;
                    proAndDevIds = proAndDevIds.replaceAll(",", ",");
                    proAndDevIds = proAndDevIds.replaceAll(";", ";");
                    proAndDevIds = proAndDevIds.replaceAll("\\\\", "/");
                    mqVo.protocolAndDeviceIds = proAndDevIds.split(",") ;
                    mqVo.deviceIds = new String[mqVo.protocolAndDeviceIds.length] ;
                    int index = 0 ;
                    for(String topicAndQosStr : mqVo.protocolAndDeviceIds){
                        String[] pd = topicAndQosStr.split("/") ;
                        mqVo.deviceIds[index] = pd[1].trim() ;
                        index++ ;
                    }
                }
                String subTopicAndQos = conf.getSetAttrTxt(doc, "config.mqtt", "subTopicAndQos", null, false, null) ;
                if(subTopicAndQos == null || subTopicAndQos.trim().equals("")){
                    throw new Exception("config.mqtt.subTopicAndQos配置的主题及Qos不合法") ;
                }else{
                    subTopicAndQos = subTopicAndQos.trim() ;
                    subTopicAndQos = subTopicAndQos.replaceAll(",", ",");
                    subTopicAndQos = subTopicAndQos.replaceAll(";", ";");
                    String[] topicAndQosArr = subTopicAndQos.split(";") ;
                    mqVo.subTopics = new String[topicAndQosArr.length] ;
                    mqVo.topicsQos = new int[topicAndQosArr.length] ;
                    mqVo.subTopicsQos = new int[topicAndQosArr.length] ;
                    int index = 0 ;
                    for(String topicAndQosStr : topicAndQosArr){
                        String[] tq = topicAndQosStr.split(",") ;
                        mqVo.subTopics[index] = tq[0].trim() ;
                        mqVo.topicsQos[index] = Integer.parseInt(tq[1].trim()) ;
                        mqVo.subTopicsQos[index] = Integer.parseInt(tq[1].trim()) ;
                        index++ ;
                    }
                }
                mqVo.publishQos = conf.getSetAttrPlusInt(doc, "config.mqtt", "publishQos", null, 0, 3, null);
                if(mqVo.publishQos < 0 || mqVo.publishQos > 3){
                    throw new Exception("config.mqtt.publishQos配置不合法") ;
                mqVo.pubTopicQos = conf.getSetAttrPlusInt(doc, "config.mqtt", "pubTopicQos", null, 0, 3, null);
                if(mqVo.pubTopicQos < 0 || mqVo.pubTopicQos > 3){
                    throw new Exception("config.mqtt.pubTopicQos配置不合法") ;
                }
                Integer intNoSubThenOff = conf.getSetAttrPlusInt(doc, "config.mqtt", "noSubThenOff", null, 1, 1440, null);
                mqVo.noSubThenOff = intNoSubThenOff * 60 * 1000L ;
                mqVo.showStartInfo = showStartInfo ;
                AdapterImp_MqttUnit mqAdapt = new AdapterImp_MqttUnit();
                mqAdapt.setConfig(mqVo);
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/forTcp/RtuLogDealer.java
@@ -26,5 +26,23 @@
        
        ResourceUnit.getInstance().rtuLog(logNode);
    }
    /**
     * è®°å½•Rtu日志
     * @param devId
     * @param content
     */
    public static void log4Mqtt(String devId, String content){
        if(devId == null || devId.trim().equals("")){
            log.error("严重错误,记录Mqtt设备日志时,设备地址未提供!") ;
            return ;
        }
        if(content == null || content.equals("")){
            log.error("严重错误,记录Mqtt设备日志时,日志内容未提供!") ;
            return ;
        }
        RtuLogNode logNode = new RtuLogNode(devId, content) ;
        ResourceUnit.getInstance().rtuLog(logNode);
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
@@ -5,6 +5,8 @@
import com.dy.common.mw.protocol.Command;
import com.dy.common.mw.protocol.rtuState.RtuStatus;
import com.dy.rtuMw.server.local.localProtocol.*;
import com.dy.rtuMw.server.mqtt.DevStatus;
import com.dy.rtuMw.server.mqtt.DevStatusDealer;
import com.dy.rtuMw.server.mqtt.MqttUnit;
import java.util.HashMap;
@@ -23,37 +25,105 @@
     * @return
     */
    public Command deal(Command com) throws Exception{
        Command rCom ;
        String code = com.getCode() ;
        if(code.equals(CodeLocal.clock)){
            return this.clock(com) ;
        }else if(code.equals(CodeLocal.onAllLine)){
            return this.onAllLine(com) ;
        }else if(code.equals(CodeLocal.onPartLine)){
            return this.onPartLine(com) ;
        }else if(code.equals(CodeLocal.onLineStatistics)){
            return this.onLineStateStatistics(com) ;
        }else if(code.equals(CodeLocal.allRtuStates)){
            return this.allRtuStates(com) ;
        }else if(code.equals(CodeLocal.partRtuStates)){
            return this.someRtuStates(com) ;
        }else if(code.equals(CodeLocal.oneRtuStates)){
            return this.oneRtuStates(com) ;
        }else if(code.equals(CodeLocal.allProtocols)){
            return this.allProtocols(com) ;
        }else if(code.equals(CodeLocal.stopTcpSv)){
            return this.stopTcpSv(com) ;
        }else if(code.equals(CodeLocal.recoverTcpSv)){
            return this.recoverTcpSv(com) ;
        }else if(code.equals(CodeLocal.recoverMqttSv)){
            return this.stopMqttSv(com) ;
        }else if(code.equals(CodeLocal.mwState)){
            return this.mwInfo(com) ;
        switch (code) {
            case CodeLocal.clock -> {
                rCom = this.clock(com);
                break;
            }
            case CodeLocal.mwState -> {
                rCom = this.mwInfo(com);
                break;
            }
            ////////////////////////////////////////////
            //
            // ä»¥ä¸‹æ˜¯ç›¸å…³åŸºäºŽTCP连接的RTU设备的内部命令
            //
            ////////////////////////////////////////////
            case CodeLocal.onAllLine -> {
                rCom = this.onAllLine(com);
                break;
            }
            case CodeLocal.onPartLine -> {
                rCom = this.onPartLine(com);
                break;
            }
            case CodeLocal.onLineStatistics -> {
                rCom = this.onLineStateStatistics(com);
                break;
            }
            case CodeLocal.allRtuStates -> {
                rCom = this.allRtuStates(com);
                break;
            }
            case CodeLocal.partRtuStates -> {
                rCom = this.someRtuStates(com);
                break;
            }
            case CodeLocal.oneRtuStates -> {
                rCom = this.oneRtuStates(com);
                break;
            }
            case CodeLocal.allProtocols -> {
                rCom = this.allProtocols(com);
                break;
            }
            case CodeLocal.stopTcpSv -> {
                rCom = this.stopTcpSv(com);
                break;
            }
            case CodeLocal.recoverTcpSv -> {
                rCom = this.recoverTcpSv(com);
                break;
            }
            ////////////////////////////////////////////
            //
            // ä»¥ä¸‹æ˜¯ç›¸å…³åŸºäºŽMQTT连接的设备的内部命令
            //
            ////////////////////////////////////////////
            case CodeLocal.onAllLineMqtt -> {
                rCom = this.onAllLineMqtt(com);
                break;
            }
            case CodeLocal.onPartLineMqtt -> {
                rCom = this.onPartLineMqtt(com);
                break;
            }
            case CodeLocal.onLineStatisticsMqtt -> {
                rCom = this.onLineStateStatisticsMqtt(com);
                break;
            }
            case CodeLocal.allRtuStatesMqtt -> {
                rCom = this.allRtuStatesMqtt(com);
                break;
            }
            case CodeLocal.partRtuStatesMqtt -> {
                rCom = this.someRtuStatesMqtt(com);
                break;
            }
            case CodeLocal.oneRtuStatesMqtt -> {
                rCom = this.oneRtuStatesMqtt(com);
                break;
            }
            case CodeLocal.stopMqttSv -> {
                rCom = this.stopMqttSv(com);
                break;
            }
            default -> {
                rCom = ReturnCommand.errored("出错,收到内部命令的功能码不能识别!", com.getId(), com.getCode());
                break;
            }
        }
        return ReturnCommand.errored("出错,收到内部命令的功能码不能识别!", com.getId(), com.getCode()) ;
        return rCom ;
    }
    /**
     * æŸ¥è¯¢é€šä¿¡ä¸­é—´ä»¶æ—¶é’Ÿ
     * @param  command
     * @throws Exception
     */
    private Command clock(Command command) throws Exception{
@@ -112,7 +182,7 @@
            Map<String, RtuStatus> map = new RtuStatusDeal().dealSome(rtuAddrGrp) ;
            return ReturnCommand.successed("查询部分RTU状态结果", command.getId(), command.getCode(), map) ;
        }else{
            return ReturnCommand.errored("出错,命令参数应该是所查询RTU的地址串",  command.getId(), command.getCode()) ;
            return ReturnCommand.errored("出错,命令参数应该有所查询RTU的地址串",  command.getId(), command.getCode()) ;
        }
    }
@@ -126,7 +196,7 @@
            RtuStatus rtuStatus = new RtuStatusDeal().dealOne(rtuAddr) ;
            return ReturnCommand.successed("查询一个RTU状态结果", command.getId(), command.getCode(), rtuStatus) ;
        }else{
            return ReturnCommand.errored("出错,命令参数应该是所查询RTU的地址",  command.getId(), command.getCode()) ;
            return ReturnCommand.errored("出错,命令参数应该有所查询RTU的地址",  command.getId(), command.getCode()) ;
        }
    }
@@ -160,8 +230,77 @@
        return ReturnCommand.successed("已经启动恢复TCP服务", command.getId(), command.getCode(), null) ;
    }
    /**
     * åœæ­¢TCP服务,不再接入新的TCP连接,已经TCP连接的全部断连接
     * æŸ¥è¯¢æ‰€æœ‰MQTT设备在线情况
     * @throws Exception
     */
    private Command onAllLineMqtt(Command command) throws Exception{
        HashMap<String, Boolean> map = DevStatusDealer.allOnLine() ;
        return ReturnCommand.successed("查询所有Mqtt设备在线情况结果", command.getId(), command.getCode(), map) ;
    }
    /**
     * æŸ¥è¯¢éƒ¨åˆ†MQTT设备在线情况
     * @throws Exception
     */
    private Command onPartLineMqtt(Command command) throws Exception{
        if(command.param != null && command.param instanceof String && !command.param.equals("")){
            String[] devIds = ((String)command.param).split(",");
            HashMap<String, Boolean> map = DevStatusDealer.partOnLine(devIds) ;
            return ReturnCommand.successed("查询部分Mqtt设备在线情况结果", command.getId(), command.getCode(), map) ;
        }else{
            return ReturnCommand.errored("出错,命令参数应该有所查询Mqtt设备的地址串",  command.getId(), command.getCode()) ;
        }
    }
    /**
     * ç»Ÿè®¡MQTT设备在线与不在线情况
     * @throws Exception
     */
    private Command onLineStateStatisticsMqtt(Command command) throws Exception{
        RtuOnLineStateStatisticsVo vo = DevStatusDealer.statisticsOnLine() ;
        return ReturnCommand.successed("查询所有Mqtt设备在线情况结果", command.getId(), command.getCode(), vo) ;
    }
    /**
     * æŸ¥è¯¢æ‰€æœ‰MQTT设备状态
     * @throws Exception
     */
    private Command allRtuStatesMqtt(Command command) throws Exception{
        Map<String, DevStatus> map =  DevStatusDealer.allStatus() ;
        return ReturnCommand.successed("查询所有Mqtt设备状态结果", command.getId(), command.getCode(), map) ;
    }
    /**
     * æŸ¥è¯¢éƒ¨åˆ†MQTT设备状态
     * @throws Exception
     */
    private Command someRtuStatesMqtt(Command command) throws Exception{
        if(command.param != null && command.param instanceof String && !command.param.equals("")){
            String[] devIds = ((String)command.param).split(",");
            Map<String, DevStatus> map = DevStatusDealer.someStatus(devIds) ;
            return ReturnCommand.successed("查询部分Mqtt设备状态结果", command.getId(), command.getCode(), map) ;
        }else{
            return ReturnCommand.errored("出错,命令参数应该有所查询Mqtt设备的地址串",  command.getId(), command.getCode()) ;
        }
    }
    /**
     * æŸ¥è¯¢éƒ¨åˆ†MQTT设备状态
     * @throws Exception
     */
    private Command oneRtuStatesMqtt(Command command) throws Exception{
        if(command.param != null && command.param instanceof String && !command.param.equals("")){
            String devId = (String)command.param;
            DevStatus devStatus = DevStatusDealer.oneStatus(devId) ;
            return ReturnCommand.successed("查询一个Mqtt设备状态结果", command.getId(), command.getCode(), devStatus) ;
        }else{
            return ReturnCommand.errored("出错,命令参数应该有所查询Mqtt设备的地址",  command.getId(), command.getCode()) ;
        }
    }
    /**
     * åœæ­¢MQTT服务
     * @throws Exception
     */
    private Command stopMqttSv(Command command) throws Exception{
@@ -174,7 +313,7 @@
    /**
     * æ¢å¤TCP服务,接入新的TCP连接
     * æ¢å¤MQTT服务
     * @throws Exception
     */
    private Command recoverMqttSv(Command command) throws Exception{
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
@@ -1,12 +1,17 @@
package com.dy.rtuMw.server.local.localProtocol;
public class CodeLocal {
    public static final String clock = "LCD0000" ;//查询监控中间件时钟
    public static final String mwState = "LCD0200" ;//得到通信中间件运行信息
    ////////////////////////////////////////////
    //
    // ä»¥ä¸‹æ˜¯ç›¸å…³åŸºäºŽTCP连接的RTU设备的内部命令
    //
    ////////////////////////////////////////////
    public static final String onAllLine = "LCD0001" ;//查询所有RTU在线情况
    public static final String onPartLine = "LCD0002" ;//查询所有RTU在线情况
    public static final String onPartLine = "LCD0002" ;//查询部分RTU在线情况
    public static final String onLineStatistics = "LCD0003" ;//查询所有RTU状态统计情况
@@ -22,10 +27,26 @@
    public static final String recoverTcpSv = "LCD0112" ;//重启TCP服务,接入新的TCP连接
    public static final String stopMqttSv = "LCD0114" ;//停止Mqtt服务
    public static final String recoverMqttSv = "LCD0116" ;//重启Mqtt服务
    public static final String mwState = "LCD0200" ;//得到通信中间件运行信息
    ////////////////////////////////////////////
    //
    // ä»¥ä¸‹æ˜¯ç›¸å…³åŸºäºŽMQTT连接的设备的内部命令
    //
    ////////////////////////////////////////////
    public static final String onAllLineMqtt = "LMCD0001" ;//查询所有MQTT设备在线情况
    public static final String onPartLineMqtt = "LMCD0002" ;//查询部分MQTT设备在线情况
    public static final String onLineStatisticsMqtt = "LMCD0003" ;//查询所有MQTT设备状态统计情况
    public static final String allRtuStatesMqtt = "LMCD0010" ;//查询所有MQTT设备状态
    public static final String partRtuStatesMqtt = "LMCD0011" ;//查询部分MQTT设备状态
    public static final String oneRtuStatesMqtt = "LMCD0012" ;//查询一个MQTT设备状态
    public static final String stopMqttSv = "LMCD0110" ;//停止Mqtt服务
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatus.java
New file
@@ -0,0 +1,23 @@
package com.dy.rtuMw.server.mqtt;
import lombok.Data;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 14:54
 * @Description
 */
@Data
public class DevStatus {
    public String id ;//设备ID
    public String protocol;//协议
    public Boolean onLine ;//是否在线 true在线 false离线
    public Boolean stirRunning ;//搅拌运行 true是 false否
    public Boolean injectRunning ;//注肥运行 true是 false否
    public Boolean irrRunning ;//灌溉运行 true是 false否
    public Boolean alarm ;//报警 true是 false否
    public Long lastDownComTime ;//上次下发命令时刻(毫秒时刻 System.currentTimeMillis())
    public Long lastUpDataTime ;//上次收到上行数据时刻(毫秒时刻 System.currentTimeMillis())
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/DevStatusDealer.java
New file
@@ -0,0 +1,186 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.mw.protocol4Mqtt.status.DevRunSt;
import com.dy.rtuMw.server.forTcp.RtuLogDealer;
import com.dy.rtuMw.server.local.localProtocol.RtuOnLineStateStatisticsVo;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
/**
 * @Author: liurunyu
 * @Date: 2025/6/10 15:00
 * @Description
 */
public class DevStatusDealer {
    private static Map<String, DevStatus> map = new HashMap<String, DevStatus>() ;
    public static HashMap<String, Boolean> allOnLine(){
        synchronized (map){
            HashMap<String, Boolean> rsMap = new HashMap<>();
            Iterator<Map.Entry<String, DevStatus>> it = map.entrySet().iterator() ;
            Map.Entry<String, DevStatus> entry = null ;
            while(it.hasNext()){
                entry = it.next() ;
                rsMap.put(entry.getKey(), entry.getValue().onLine) ;
            }
            return rsMap ;
        }
    }
    public static HashMap<String, Boolean> partOnLine(String[] devIds){
        synchronized (map){
            HashMap<String, Boolean> rsMap = new HashMap<String, Boolean>();
            for(String devid : devIds){
                DevStatus st = map.get(devid) ;
                if(st != null){
                    rsMap.put(devid, st.onLine) ;
                }
            }
            return rsMap ;
        }
    }
    /**
     * ç»Ÿè®¡åœ¨çº¿ä¸Žä¸åœ¨çº¿æƒ…况
     */
    public static RtuOnLineStateStatisticsVo statisticsOnLine(){
        RtuOnLineStateStatisticsVo vo = new RtuOnLineStateStatisticsVo() ;
        vo.onLineNum = 0 ;
        vo.offLineNum = 0 ;
        synchronized (map){
            Iterator<Map.Entry<String, DevStatus>> it = map.entrySet().iterator() ;
            Map.Entry<String, DevStatus> entry = null ;
            while(it.hasNext()){
                entry = it.next() ;
                if(((DevStatus)entry).onLine != null && ((DevStatus)entry).onLine.booleanValue()){
                    vo.onLineNum++ ;
                }else{
                    vo.offLineNum++ ;
                }
            }
        }
        return vo ;
    }
    /**
     * å¾—到全部状态
     * @return
     */
    public static Map<String, DevStatus> allStatus(){
        return map ;
    }
    /**
     * å¾—到部分状态
     * @return
     */
    public static Map<String, DevStatus> someStatus(String[] devIdArrGrp){
        synchronized (map){
            Map<String, DevStatus> rsMap = new HashMap<>();
            for(String devId : devIdArrGrp){
                DevStatus status = map.get(devId) ;
                if(status != null){
                    rsMap.put(devId, status) ;
                }
            }
            return rsMap ;
        }
    }
    /**
     * å¾—到一个RTU的状态
     * @return
     */
    public static DevStatus oneStatus(String devId){
        return map.get(devId) ;
    }
    public static void updateOnLineState() {
        if (MqttUnit.confVo != null
                && MqttUnit.confVo.noSubThenOff != null
                && MqttUnit.confVo.noSubThenOff.longValue() > 0) {
            Long now = System.currentTimeMillis() ;
            synchronized (map){
                Set<Map.Entry<String, DevStatus>> entrySet = map.entrySet() ;
                Iterator<Map.Entry<String, DevStatus>> it = entrySet.iterator() ;
                Map.Entry<String, DevStatus> entry ;
                DevStatus st;
                while(it.hasNext()){
                    entry = it.next() ;
                    st = entry.getValue();
                    if(st.onLine != null && st.onLine.booleanValue() && st.lastUpDataTime != null){
                        if(now - st.lastUpDataTime > MqttUnit.confVo.noSubThenOff.longValue()){
                            st.onLine = false ;
                            RtuLogDealer.log4Mqtt(entry.getKey(), "因较长时间未收上行数据,认为设备离线");
                        }
                    }
                }
            }
        }
    }
    /**
     * å‘送消息后
     * @param devId
     */
    public static void afterSendPubMessage(String devId){
        DevStatus st = map.get(devId);
        if(st != null){
            st.lastDownComTime = System.currentTimeMillis() ;
        }
    }
    /**
     * æŽ¥æ”¶æ¶ˆæ¯åŽ
     * @param devId
     */
    public static void afterReceiveSubMessage(String devId){
        DevStatus st = map.get(devId);
        if(st != null){
            st.lastUpDataTime = System.currentTimeMillis() ;
        }
    }
    public static void onLine(String devId, String protocol){
        DevStatus vo = map.get(devId) ;
        if(vo == null) {
            vo = new DevStatus();
            vo.id = devId ;
            vo.protocol = protocol ;
            vo.onLine = true ;
            map.put(devId, vo);
        }else {
            vo.onLine = true ;
        }
    }
    public static void offLine(String devId){
        DevStatus vo = map.get(devId) ;
        if(vo == null) {
            vo = new DevStatus();
            vo.onLine = false ;
            map.put(devId, vo);
        }else {
            vo.onLine = false ;
        }
    }
    public static void setStatus(String devId, DevRunSt st){
        DevStatus vo = map.get(devId) ;
        if(vo != null) {
            if(st.stirRunning != null){
                vo.stirRunning = st.stirRunning ;
            }
            if(st.injectRunning != null){
                vo.injectRunning = st.injectRunning ;
            }
            if(st.irrRunning != null){
                vo.irrRunning = st.irrRunning ;
            }
            if(st.alarm != null){
                vo.alarm = st.alarm ;
            }
        }
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
@@ -1,6 +1,11 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.mw.channel.mqtt.MqttClientPool;
import com.dy.common.mw.protocol4Mqtt.MqttNotify;
import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo;
import com.dy.common.mw.protocol4Mqtt.status.DevOnLineSt;
import com.dy.common.mw.protocol4Mqtt.status.DevRunSt;
import com.dy.rtuMw.server.ServerProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttClient;
@@ -33,13 +38,17 @@
        this.configVo = configVo;
    }
    /**
     * åˆ›å»ºè¿žæŽ¥æ±  + è®¢é˜…主题
     * @throws Exception
     */
    public void start()throws Exception{
        String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort;
        this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize);
        if(this.pool.isClose()){
            throw new Exception("Mqtt连接池初始化失败");
        }
        MqttClient clientSub = null ;
        MqttClient clientSub ;
        try {
            clientSub = pool.popClient();//新创建一个Client时,此Client实际去连接MQTT服务器,如果连接不上,就会抛出异常
        }catch (Exception e){
@@ -48,8 +57,35 @@
        if(clientSub == null || !clientSub.isConnected()){
            throw new Exception("Mqtt连接池获得订阅连接不可用");
        }
        // è®¢é˜…主题
        for(int i = 0; i < this.configVo.subTopics.length; i++){
            clientSub.subscribe(this.configVo.subTopics[i], this.configVo.topicsQos[i], new MqttMessageListener());
            for(int j = 0 ; j < this.configVo.protocolAndDeviceIds.length; j++){
                clientSub.subscribe(ServerProperties.orgTag + "/"
                        + this.configVo.protocolAndDeviceIds[j] + "/"
                        + this.configVo.subTopics[i],
                        this.configVo.subTopicsQos[i],
                        //每一个订阅主题都有一个MqttMessageListener实例
                        new MqttMessageListener(new MqttNotify(){
                            @Override
                            public void notify(String devId, MqttNotifyInfo... infos) {
                                if(devId != null && infos != null && infos.length > 0){
                                    for(MqttNotifyInfo info : infos){
                                        if(info instanceof DevOnLineSt){
                                            DevOnLineSt onLineSt = (DevOnLineSt)info;
                                            if(onLineSt.onLine != null && onLineSt.onLine.booleanValue()){
                                                DevStatusDealer.onLine(devId, ((DevOnLineSt)info).protocol);
                                            }else{
                                                DevStatusDealer.offLine(devId);
                                            }
                                        } else if(info instanceof DevRunSt){
                                            DevStatusDealer.setStatus(devId, (DevRunSt)info);
                                        }
                                    }
                                }
                            }
                        })
                );
            }
        }
    }
@@ -69,12 +105,12 @@
    }
    public void publishMsg(MqttClient client, String topic, byte[] msg) throws Exception{
        client.publish(topic, msg, this.configVo.publishQos, false);
        client.publish(topic, msg, this.configVo.pubTopicQos, false);
    }
    public void publishMsg(MqttClient client, String topic, String msg) throws Exception{
        byte[] bs = msg.getBytes("UTF-8") ;
        client.publish(topic, bs, this.configVo.publishQos, false);
        client.publish(topic, bs, this.configVo.pubTopicQos, false);
    }
    public boolean poolIsClose(){
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
@@ -1,9 +1,11 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.mw.protocol4Mqtt.MqttMsgParser;
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
import com.dy.common.mw.protocol4Mqtt.*;
import com.dy.common.mw.protocol4Mqtt.MqttCallback;
import com.dy.common.mw.protocol4Mqtt.MqttTopic;
import com.dy.common.util.Callback;
import com.dy.rtuMw.server.forTcp.RtuLogDealer;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.* ;
/**
@@ -11,11 +13,30 @@
 * @Date: 2025/6/4 15:52
 * @Description
 */
@Slf4j
public class MqttMessageListener implements IMqttMessageListener{
    private MqttNotify notify ;
    public MqttMessageListener(MqttNotify notify){
        this.notify = notify ;
    }
    @Override
    public void messageArrived(String topic, MqttMessage msg) throws Exception {
        MqttMsgParser parser = new MqttMsgParser() ;
        MqttSubMsg subMsg = parser.parseSubMsg(topic, msg) ;
        MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic) ;
        MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback(){
            @Override
            public void callback(MqttSubMsg subMsg) {
                DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol);
                DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId);
                RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息    ä¸»é¢˜ï¼š" + subMsg.topic + "   æ¶ˆæ¯ï¼š" + subMsg.msg);
            }
            @Override
            public void notify(String devId, MqttNotifyInfo... infos) {
                if(notify != null){
                    notify.notify(devId, infos) ;
                }
            }
        }) ;
        this.nextDeal(subMsg);
    }
    private void nextDeal(MqttSubMsg subMsg)throws Exception {
@@ -25,17 +46,19 @@
                MqttSubMsg subMs = (MqttSubMsg) obj ;
                MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ;
                if(pubMs != null){
                    //匹配到下行消息(命令)
                    subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ;
                    subMs.commandId = pubMs.commandId ;
                    try {
                        MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs));
                    } catch (Exception e) {
                        e.printStackTrace();
                        log.error("缓存发布消息(命令)结果发生异常", e);
                    }
                }
                try{
                    MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg));
                }catch (Exception e){
                    log.error("缓存订阅消息数据发生异常", e);
                }
            }
            @Override
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
@@ -3,6 +3,7 @@
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
import com.dy.common.queue.NodeObj;
import com.dy.rtuMw.server.ServerProperties;
import com.dy.rtuMw.server.forTcp.RtuLogDealer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttClient;
@@ -65,6 +66,8 @@
            if(mqttClient != null && mqttClient.isConnected()){
                try {
                    mqttManager.publishMsg(mqttClient, this.result.topic, this.result.msg);
                    DevStatusDealer.afterSendPubMessage(this.result.deviceId);
                    RtuLogDealer.log4Mqtt(this.result.deviceId, "发布消息    ä¸»é¢˜ï¼š" + this.result.topic + "   æ¶ˆæ¯ï¼š" + this.result.msg);
                    log.info("发布MQTT消息(主题=" + this.result.topic + ")" + this.result.msg);
                }catch (Exception e){
                    log.error("MQTT发布消息失败(主题=" + this.result.topic + ")" , e);
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
@@ -13,9 +13,12 @@
    public String svUserName ;//
    public String svUserPassword ;//
    public Integer poolMaxSize ;//
    public String[] protocolAndDeviceIds ;//设备协议与ID(FBox)id
    public String[] deviceIds ;//设备(FBox)id
    public String[] subTopics ;//订阅的主题
    public int[] topicsQos ;////订阅主题的Qos
    public int publishQos ;////发布消息的Qos
    public int[] subTopicsQos;//订阅主题的Qos
    public int pubTopicQos;//发布消息的Qos
    public Long noSubThenOff; //MQtt设备在一定时间后未发布消息,认为设备离线
    public MqttUnitConfigVo(){
        this.enable = false ;
@@ -24,6 +27,7 @@
        this.svUserName = "dyyjy" ;
        this.svUserPassword = "Dyyjy2025,;.abc!@#" ;
        this.poolMaxSize = 10 ;
        this.publishQos = 1 ;
        this.pubTopicQos = 1 ;
        this.noSubThenOff = 10 * 60 * 10000L ;
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java
@@ -2,6 +2,7 @@
import com.dy.common.mw.core.CoreTask;
import com.dy.common.queue.Node;
import com.dy.rtuMw.server.mqtt.DevStatusDealer;
import com.dy.rtuMw.server.mqtt.MqttSubMsgCache;
import com.dy.rtuMw.server.mqtt.MqttSubMsgNode;
import org.apache.logging.log4j.LogManager;
@@ -21,16 +22,26 @@
    @Override
    public Integer execute() {
        try{
            dealOneline() ;
        }catch(Exception e){
            log.error("更新RTU会话上报数据时刻时发生集合操作异常,此异常并不影响系统正常运行", e);
        }
        try{
            dealMqMsg() ;
        }catch(Exception e){
            log.error(e);
        }
        return MqttSubMsgCache.size()>0?0:1 ;
    }
    private void dealOneline(){
        DevStatusDealer.updateOnLineState();
    }
    /**
     * å¤„理MQTT订阅的消息
     */
    public void dealMqMsg() {
    private void dealMqMsg() {
        Node first = MqttSubMsgCache.getFirstQueueNode() ;
        if(first != null){
            Node last = MqttSubMsgCache.getLastQueueNode() ;
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties
@@ -10,7 +10,7 @@
#   ç”˜å·žï¼š gz
#   å‡‰å·žï¼š lz
#   é‡‘川: jc
base.orgTag=ym
base.orgTag=mq
# 233服务器:
#   å…ƒè°‹ï¼š 60000
@@ -30,19 +30,23 @@
base.upData.min.interval=6
# MQTT服务配置
#   mqtt.enable æ˜¯å¦å¯åЍ
#   mqtt.protocolAndDeviceIds åœ¨å­ç³»ç»Ÿï¼ˆorgTag)中接入的设备(FBox)所用协议及设备id集合,多个用逗号隔开,协议与ID用正斜杠隔开,例如:sd1/338220031439,sd1/338220031440
#   mqtt.subTopicAndQos è®¢é˜…主题与Qos,主题名与其Qos用逗号隔开,多个主题及Qos用分号隔开,例如:topic1,1;topic2,1;topic3,1
# 233服务器:
#   å…ƒè°‹ï¼š mqtt.enable=false
#   æ²™ç›˜ï¼š mqtt.enable=false
#   æµ‹è¯•: mqtt.enable=false
#   æ¢…江: mqtt.enable=false
#   å…ƒè°‹ï¼š mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
#   æ²™ç›˜ï¼š mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
#   æµ‹è¯•: mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
#   æ¢…江: mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
# 121服务器:
#   æ°‘勤: mqtt.enable=true
#   å»¶åº†ï¼š mqtt.enable=false
#   é»‘龙江: mqtt.enable=false
#   ç”˜å·žï¼š mqtt.enable=false
#   å‡‰å·žï¼š mqtt.enable=false
#   é‡‘川: mqtt.enable=true
# mq/sd1/338220031439/weather
#   æ°‘勤: mqtt.enable=true  mqtt.protocolAndDeviceIds=? mqtt.topicAndQos=weather,1;soil,1;manure,1;state,1
#   å»¶åº†ï¼š mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
#   é»‘龙江: mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
#   ç”˜å·žï¼š mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
#   å‡‰å·žï¼š mqtt.enable=false  mqtt.protocolAndDeviceIds= mqtt.topicAndQos=
#   é‡‘川: mqtt.enable=true  mqtt.protocolAndDeviceIds=? mqtt.topicAndQos=weather,1;soil,1;manure,1;state,1
mqtt.enable=true
mqtt.topicAndQos=ym/sd1/10000/control/m1,1;ym/sd1/10000/control/m2,1;ym/sd1/control/m4,1;ym/sd1/10000/control/m80,1
mqtt.protocolAndDeviceIds=sd1/338220031439,sd1/338220031440
mqtt.subTopicAndQos=weather,1;soil,1;manure,1;state,1
#MQtt设备在一定时间(分钟)后未发布消息,认为设备离线
mqtt.noSubThenOff=10
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
@@ -166,11 +166,18 @@
    <!--
    topicAndQos: ä¸»é¢˜ä¸ŽQos,主题名与其Qos用逗号隔开,多个主题及Qos用分号隔开,例如:ym/topic1,1;ym/topic2,1;ym/topic3,1,如果有多个OrgTag,主题前缀用其OrgTag
    publishQos: å‘布消息的Qos,取值范围:
    enable æ˜¯å¦å¯åЍ
    svIp MQTT服务器IP
    svUserName MQTT服务器用户名
    svUserPassword MQTT服务器用户密码
    poolMaxSize è¿žæŽ¥æ± æœ€å¤§è¿žæŽ¥æ•°
    protocolAndDeviceIds åœ¨å­ç³»ç»Ÿï¼ˆorgTag)中接入的设备(FBox)所用协议及设备id集合,多个用逗号隔开,协议与ID用正斜杠隔开,例如:sd1/338220031439,sd1/338220031440
    subTopicAndQos: è®¢é˜…主题与Qos,主题名与其Qos用逗号隔开,多个主题及Qos用分号隔开,例如:ym/topic1,1;ym/topic2,1;ym/topic3,1,如果有多个OrgTag,主题前缀用其OrgTag
    pubTopicQos: å‘布主题的Qos,取值范围:
        0    è‡³å¤šä¸€æ¬¡ï¼ˆAt most once)    æ¶ˆæ¯å‘送后不保证到达,可能丢失或重复,开销最小,可靠性最低。
        1    è‡³å°‘一次(At least once)    æ¶ˆæ¯è‡³å°‘会到达一次,可能重复,但不会丢失,可靠性中等,适用于多数场景。
        2    æ°å¥½ä¸€æ¬¡ï¼ˆExactly once)    æ¶ˆæ¯ä»…会到达一次,不重复且不丢失,可靠性最高,但开销最大,实现最复杂。
    noSubThenOff: MQtt设备在一定时间(分钟)后未发布消息,认为设备离线
     -->
    <mqtt enable="${mqtt.enable}"
          svIp="121.199.41.121"
@@ -178,8 +185,10 @@
          svUserName="dyyjy"
          svUserPassword="Dyyjy2025,;.abc!@#"
          poolMaxSize="10"
          topicAndQos="${mqtt.topicAndQos}"
          publishQos="1"
          protocolAndDeviceIds="${mqtt.protocolAndDeviceIds}"
          subTopicAndQos="${mqtt.subTopicAndQos}"
          pubTopicQos="1"
          noSubThenOff="${mqtt.noSubThenOff}"
    />
</config>