liurunyu
2 天以前 f8b2e59a82702a790c383a8ecd90c708c76e2488
增量开发MQTT协议、功能模块,上下行命令(消息)等
14个文件已修改
32个文件已添加
2090 ■■■■■ 已修改文件
pipIrr-platform/pipIrr-common/pom.xml 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java 109 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/CommandType.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java 68 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java 170 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/说明.txt 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/pom.xml 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java 126 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/AdapterImp_MqttUnit.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultCache.java 69 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultNode.java 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java 86 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java 132 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java 95 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgCache.java 68 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgNode.java 69 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnit.java 68 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitAdapter.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkMqttData.java 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkReceive.java 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkFindPSdV1.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttComResultConstantTask.java 75 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttPubMessageConstantTask.java 81 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java 77 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/WebDownCom4MqttTask.java 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/com/CommandCtrl.java 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/comResult/CommandResultDeal.java 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-common/pom.xml
@@ -132,6 +132,18 @@
            <version>2.2.2</version>
        </dependency>
        <!-- MQTT消息中间件,连接池及客户端  -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
        <!-- quartz -->
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
@@ -173,6 +185,7 @@
            <version>2.17.2</version>
        </dependency>
    </dependencies>
    <build>
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPool.java
New file
@@ -0,0 +1,46 @@
package com.dy.common.mw.channel.mqtt;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.eclipse.paho.client.mqttv3.MqttClient;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 11:35
 * @Description
 */
public class MqttClientPool {
    private final GenericObjectPool<MqttClient> pool;
    public MqttClientPool(String broker, String username, String password, int maxConnections) {
        MqttClientPooledObjectFactory factory = new MqttClientPooledObjectFactory(broker, username, password);
        GenericObjectPoolConfig<MqttClient> config = new GenericObjectPoolConfig<>();
        config.setMaxTotal(maxConnections);
        config.setMaxIdle(maxConnections);
        config.setMinIdle(1);
        config.setTestOnBorrow(true);
        config.setTestOnReturn(true);
        config.setTestWhileIdle(true);
        this.pool = new GenericObjectPool<>(factory, config);
    }
    public MqttClient popClient() throws Exception {
        return pool.borrowObject();
    }
    public void pushClient(MqttClient client) {
        if (client != null) {
            pool.returnObject(client);
        }
    }
    public boolean isClose(){
        return pool.isClosed();
    }
    public void close() {
        pool.close();
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/MqttClientPooledObjectFactory.java
New file
@@ -0,0 +1,60 @@
package com.dy.common.mw.channel.mqtt;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 11:16
 * @Description
 */
public class MqttClientPooledObjectFactory extends BasePooledObjectFactory<MqttClient> {
    private final String broker;
    private final String username;
    private final String password;
    public MqttClientPooledObjectFactory(String broker, String username, String password) {
        this.broker = broker;
        this.username = username;
        this.password = password;
    }
    @Override
    public MqttClient create() throws Exception {
        String clientId = MqttClient.generateClientId();
        MqttClient client = new MqttClient(broker, clientId);
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setAutomaticReconnect(true);
        options.setCleanSession(true);
        client.connect(options);
        return client;
    }
    @Override
    public PooledObject<MqttClient> wrap(MqttClient client) {
        return new DefaultPooledObject<>(client);
    }
    @Override
    public void destroyObject(PooledObject<MqttClient> p) throws Exception {
        MqttClient client = p.getObject();
        if (client.isConnected()) {
            client.disconnect();
        }
        client.close();
    }
    @Override
    public boolean validateObject(PooledObject<MqttClient> p) {
        MqttClient client = p.getObject();
        return client.isConnected();
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/channel/mqtt/Test.java
New file
@@ -0,0 +1,109 @@
package com.dy.common.mw.channel.mqtt;
import com.dy.common.util.Callback;
import com.dy.common.util.ThreadJob;
import org.eclipse.paho.client.mqttv3.MqttClient;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 11:35
 * @Description
 */
public class Test {
    private static String mqSvIp = "121.199.41.121";
    private static Integer mqSvPort = 1883;
    private static String mqSvUserName = "dyyjy";
    private static String mqSvUserPassword = "Dyyjy2025,;.abc!@#";
    private static String topic1 = "test/topic1" ;
    private static String topic2 = "test/topic2" ;
    private static int maxConnections = 10 ;
    private static MqttClientPool pool;
    /**
     * QoS
     * ç­‰çº§    åç§°                        æ¶ˆæ¯ä¼ é€’特性
     * 0    è‡³å¤šä¸€æ¬¡ï¼ˆAt most once)    æ¶ˆæ¯å‘送后不保证到达,可能丢失或重复,开销最小,可靠性最低。
     * 1    è‡³å°‘一次(At least once)    æ¶ˆæ¯è‡³å°‘会到达一次,可能重复,但不会丢失,可靠性中等,适用于多数场景。
     * 2    æ°å¥½ä¸€æ¬¡ï¼ˆExactly once)    æ¶ˆæ¯ä»…会到达一次,不重复且不丢失,可靠性最高,但开销最大,实现最复杂。
     */
    private static int QoS = 1 ;
    public static void main(String[] args) {
        try{
            // åˆå§‹åŒ–连接池
            pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections);
            MqttClient clientSub = pool.popClient() ;
            testSubscribe(clientSub, topic1);
            testSubscribe(clientSub, topic2);
            MqttClient clientPub = pool.popClient() ;
            testPublish(clientPub, topic1, "hello world", 1000);
            testPublish(clientPub, topic2, "hello China", 1500);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // å…³é—­è¿žæŽ¥æ± 
            //pool.close();
        }
    }
    private static void testSubscribe(MqttClient clientSub, String topic) throws Exception {
        ThreadJob tjob = new ThreadJob() {
            @Override
            public Object execute() throws Exception {
                clientSub.subscribe(topic, QoS, (topic, msg) -> {
                    System.out.println("从主题" + topic + "收到消息: " + new String(msg.getPayload()));
                });
                while (true) {
                    Thread.sleep(1000L);
                }
            }
        };
        tjob.start(new Callback() {
            @Override
            public void call(Object obj) {
                System.out.println("执行成功");
            }
            @Override
            public void call(Object... objs) {
                System.out.println("执行成功");
            }
            @Override
            public void exception(Exception e) {
                e.printStackTrace();
            }
        });
    }
    private static void testPublish(MqttClient clientPub, String topic, String message, long sleep) throws Exception {
        ThreadJob tjob = new ThreadJob() {
            @Override
            public Object execute() throws Exception {
                int count = 0 ;
                while (true) {
                    // å‘布消息
                    byte[] bs = (message + "  " + count++).getBytes() ;
                    clientPub.publish(topic, bs, QoS, false);
                    Thread.sleep(sleep);
                }
            }
        };
        tjob.start(new Callback() {
            @Override
            public void call(Object obj) {
                System.out.println("执行成功");
            }
            @Override
            public void call(Object... objs) {
                System.out.println("执行成功");
            }
            @Override
            public void exception(Exception e) {
                e.printStackTrace();
            }
        });
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol/CommandType.java
@@ -20,6 +20,14 @@
     */
    public static final String outerTransCommand = "outerTransCommand" ;
    /**
     * é’ˆå¯¹Mqtt外部命令
     * åªèƒ½æ˜¯å¼‚步,命令结果通过相关的信息发布通道发布出去
     */
    public static final String mqttCommand = "mqttCommand" ;
    /**
     * æœ¬å‘½ä»¤æ˜¯ä¸€ä¸ªåˆ«çš„命令的结果(结果以命令的方式表示)
     */
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/Com4Mqtt.java
New file
@@ -0,0 +1,23 @@
package com.dy.common.mw.protocol4Mqtt;
import lombok.Data;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 17:38
 * @Description å‘½ä»¤å€¼å¯¹è±¡
 */
@Data
public class Com4Mqtt {
    public String commandId ;//命令ID
    public String deviceId ;//设备ID
    public String protocol;//协议名称(实现厂家编码)
    public Short protocolVersion;//协议版本号
    public String code ;//功能码
    public String value ;//值(可能是整数、浮点数、布尔数(true或false))
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttMsgParser.java
New file
@@ -0,0 +1,61 @@
package com.dy.common.mw.protocol4Mqtt;
import com.dy.common.mw.protocol.Command;
import com.dy.common.mw.protocol4Mqtt.pSdV1.ProtocolConstantSdV1;
import com.dy.common.mw.protocol4Mqtt.pSdV1.ProtocolParserSdV1;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
 * @Author: liurunyu
 * @Date: 2025/6/5 10:00
 * @Description
 */
public class MqttMsgParser {
    public MqttSubMsg parseSubMsg(String topic, MqttMessage mqttMsg) throws Exception {
        if(topic != null && topic.trim().length() != 0){
            String[] topicGrp = topic.split("/") ;
            if(topicGrp.length != 4){
                throw new Exception("接收的mqtt消息主题不可识别") ;
            }else{
                if(topicGrp[1].equals("sd1")){
                    //山东设备(协议),且版本号为1
                    return new ProtocolParserSdV1().parseSubMsg(topicGrp[2], topic, mqttMsg);
                }else{
                    throw new Exception("接收的mqtt消息主题中协议(厂家及版本)不可识别") ;
                }
            }
        }else{
            throw new Exception("接收的mqtt消息主题为空") ;
        }
    }
    public MqttPubMsg createPubMsg(String orgTag, Command com) throws Exception {
        if(com.protocol == null && com.protocol.trim().length() != 0){
            throw new Exception("接收到MQTT命令,但未提供协议") ;
        }
        if(com.protocolVersion == null){
            throw new Exception("接收到MQTT命令,但未提供协议版本号") ;
        }
        if(com.code != null && com.code.trim().length() != 0){
            throw new Exception("接收到MQTT命令,但未提供功能码") ;
        }
        if(com.protocol.equals(ProtocolConstantSdV1.protocolName)){
            if(com.protocolVersion.shortValue() == ProtocolConstantSdV1.protocolVer){
                return new ProtocolParserSdV1().createPubMsg(orgTag, com) ;
            }else{
                throw new Exception("接收到MQTT命令,协议" + com.protocol + "版本" + com.protocolVersion + "构造器未实现") ;
            }
        }else{
            throw new Exception("接收到MQTT命令,协议" + com.protocol + "构造器未实现") ;
        }
    }
    public static void main(String[] args) {
        String s = "ym/sd1/10000/control/m1" ;
        String[] ss = s.split("/") ;
        for (String s1 : ss) {
            System.out.println(s1);
        }
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttPubMsg.java
New file
@@ -0,0 +1,23 @@
package com.dy.common.mw.protocol4Mqtt;
/**
 * @Author: liurunyu
 * @Date: 2025/6/5 11:44
 * @Description
 */
public abstract class MqttPubMsg {
    public String commandId ;//命令ID
    public String deviceId ;//设备ID
    public String mqttResultSendWebUrl ;//Mqtt返回命令结果 å‘向目的地web URL
    public String topic ;//消息主题
    public String msg ;//消息
    public boolean isCacheForOffLine ;//下行命令控制,消息中间件不在线是否缓存命令
    public boolean hasResponse ;//下行命令控制,命令是否有应答
    public abstract boolean valid();
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/MqttSubMsg.java
New file
@@ -0,0 +1,26 @@
package com.dy.common.mw.protocol4Mqtt;
import com.dy.common.util.Callback;
/**
 * @Author: liurunyu
 * @Date: 2025/6/5 11:44
 * @Description
 */
public abstract class MqttSubMsg {
    public String commandId ;//命令ID
    public String deviceId ;//设备ID
    public String mqttResultSendWebUrl ;//Mtt返回命令结果 å‘向目的地web URL
    public String topic ;//消息主题
    public String msg ;//消息
    public abstract boolean valid();
    public abstract boolean subMsgMatchPubMsg(MqttPubMsg pubMsg);
    public abstract void action(Callback callback);
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/CodeSdV1.java
New file
@@ -0,0 +1,13 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 17:41
 * @Description
 */
public class CodeSdV1 {
    public static final String cd_Fault = "00" ;//故障解除命令
    public static final String cd_Stir = "01" ;//搅拌启停命令
    public static final String cd_Inject = "02" ;//注肥启停命令
    public static final String cd_Irr = "03" ;//灌溉启停命令
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttPubMsgSdV1.java
New file
@@ -0,0 +1,27 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1;
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
import lombok.Data;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 18:00
 * @Description ä¸‹å‘的发布消息(即下行命令)
 */
@Data
public class MqttPubMsgSdV1 extends MqttPubMsg {
    public Integer address ;//寄存器地址
    public String value ;//寄存器值
    @Override
    public boolean valid() {
        if (topic == null || topic.isEmpty()) {
            return false;
        }
        if (msg == null || msg.isEmpty()) {
            return false;
        }
        return true;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/MqttSubMsgSdV1.java
New file
@@ -0,0 +1,68 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1;
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
import com.dy.common.util.Callback;
import lombok.Data;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 16:58
 * @Description æ”¶åˆ°çš„订阅消息
 */
@Data
public class MqttSubMsgSdV1 extends MqttSubMsg {
    public Integer address ;//寄存器地址
    public String value ;//寄存器值
    public MqttSubMsgSdV1(){}
    public MqttSubMsgSdV1(String deviceId, String topic, String msg) {
        this.deviceId = deviceId ;
        this.topic = topic ;
        this.msg = msg ;
    }
    public String toString(){
        StringBuilder sb = new StringBuilder();
        if(commandId != null){
            sb.append("commandId:")
                    .append(commandId)
                    .append("\n") ;
        }
        sb.append("主题:")
                .append(topic)
                .append("\n") ;
        sb.append("消息:")
                .append(msg)
                .append("\n") ;
        return sb.toString() ;
    }
    public boolean subMsgMatchPubMsg(MqttPubMsg pubMsg){
        if (pubMsg instanceof MqttPubMsgSdV1) {
            MqttPubMsgSdV1 pubMsgSdV1 = (MqttPubMsgSdV1) pubMsg;
            if(this.address.intValue() == pubMsgSdV1.getAddress().intValue()){
                return true ;
            }
        }
        return false ;
    }
    @Override
    public boolean valid() {
        if (topic == null || topic.isEmpty()) {
            return false;
        }
        if (msg == null || msg.isEmpty()) {
            return false;
        }
        return true;
    }
    @Override
    public void action(Callback callback){
        callback.call(this) ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolConstantSdV1.java
New file
@@ -0,0 +1,11 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1;
/**
 * @Author: liurunyu
 * @Date: 2025/6/5 10:52
 * @Description
 */
public class ProtocolConstantSdV1 {
    public static final String protocolName = "sd" ;
    public static final short protocolVer = 1 ;
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/ProtocolParserSdV1.java
New file
@@ -0,0 +1,170 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.dy.common.mw.protocol.Command;
import com.dy.common.mw.protocol4Mqtt.pSdV1.downVos.ComCtrlVo;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
 * @Author: liurunyu
 * @Date: 2025/6/5 11:41
 * @Description
 */
public class ProtocolParserSdV1 {
    public MqttSubMsgSdV1 parseSubMsg(String deviceId, String topic, MqttMessage mqttMsg) throws Exception {
        MqttSubMsgSdV1 ms = new MqttSubMsgSdV1(deviceId, topic, new String(mqttMsg.getPayload(), "UTF-8"));
        return ms;
    }
    public MqttPubMsgSdV1 createPubMsg(String orgTag, Command com) throws Exception {
        MqttPubMsgSdV1 msg = null ;
        switch (com.code){
            case CodeSdV1.cd_Fault:{
                //故障解除命令
                this.checkParam(com);
                this.checkRtnWebUrl(com);
                msg = this.createPubMsgOfFault(orgTag, com) ;
                break ;
            }
            case CodeSdV1.cd_Stir:{
                //搅拌启停命令
                this.checkParam(com);
                this.checkRtnWebUrl(com);
                msg = this.createPubMsgOfStir(orgTag, com) ;
                break ;
            }
            case CodeSdV1.cd_Inject:{
                //注肥启停命令
                this.checkParam(com);
                this.checkRtnWebUrl(com);
                msg = this.createPubMsgOfInject(orgTag, com) ;
                break ;
            }
            case CodeSdV1.cd_Irr:{
                //灌溉启停命令
                this.checkParam(com);
                this.checkRtnWebUrl(com);
                msg = this.createPubMsgOfIrr(orgTag, com) ;
                break ;
            }
            default:{
                throw new Exception("接收到MQTT命令,协议" + com.protocol + "版本" + com.protocolVersion + "功能码" + com.code + "构造器未实现") ;
            }
        }
        return msg ;
    }
    private void checkParam(Command com)throws Exception {
        if(com.param == null){
            throw new Exception("接收到MQTT命令,协议" + com.protocol + "版本" + com.protocolVersion + "功能码" + com.code + "命令数据为空") ;
        }
    }
    private void checkRtnWebUrl(Command com)throws Exception {
        if(com.rtuResultSendWebUrl == null || com.rtuResultSendWebUrl.trim().equals("")){
            throw new Exception("接收到MQTT命令,协议" + com.protocol + "版本" + com.protocolVersion + "功能码" + com.code + "命令结果回收URL为空") ;
        }
    }
    private MqttPubMsgSdV1 createPubMsgOfFault(String orgTag, Command com) throws Exception {
        JSONObject obj = (JSONObject) com.param;
        String json = obj.toJSONString();
        ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class);
        if(cvo == null){
            throw new Exception("json转ComCtrlVo为null") ;
        }
        MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ;
        this.setPubMsgBase(com, msg);
        msg.isCacheForOffLine = false ;
        msg.hasResponse = true ;
        msg.address = 123 ;
        msg.value = "" + (cvo.isDo?1:0);
        msg.topic = createTopic(orgTag, com) ;
        msg.msg = "" ;
        return msg ;
    }
    private MqttPubMsgSdV1 createPubMsgOfStir(String orgTag, Command com) throws Exception {
        JSONObject obj = (JSONObject) com.param;
        String json = obj.toJSONString();
        ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class);
        if(cvo == null){
            throw new Exception("json转ComCtrlVo为null") ;
        }
        MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ;
        this.setPubMsgBase(com, msg);
        msg.isCacheForOffLine = false ;
        msg.hasResponse = true ;
        msg.address = 123 ;
        msg.value = "" + (cvo.isDo?1:0);
        msg.topic = createTopic(orgTag, com) ;
        msg.msg = "" ;
        return msg ;
    }
    private MqttPubMsgSdV1 createPubMsgOfInject(String orgTag, Command com) throws Exception {
        JSONObject obj = (JSONObject) com.param;
        String json = obj.toJSONString();
        ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class);
        if(cvo == null){
            throw new Exception("json转ComCtrlVo为null") ;
        }
        MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ;
        this.setPubMsgBase(com, msg);
        msg.isCacheForOffLine = false ;
        msg.hasResponse = true ;
        msg.address = 123 ;
        msg.value = "" + (cvo.isDo?1:0);
        msg.topic = createTopic(orgTag, com) ;
        msg.msg = "" ;
        return msg ;
    }
    private MqttPubMsgSdV1 createPubMsgOfIrr(String orgTag, Command com) throws Exception {
        JSONObject obj = (JSONObject) com.param;
        String json = obj.toJSONString();
        ComCtrlVo cvo = JSON.parseObject(json, ComCtrlVo.class);
        if(cvo == null){
            throw new Exception("json转ComCtrlVo为null") ;
        }
        MqttPubMsgSdV1 msg = new MqttPubMsgSdV1() ;
        this.setPubMsgBase(com, msg);
        msg.isCacheForOffLine = false ;
        msg.hasResponse = true ;
        msg.address = 123 ;
        msg.value = "" + (cvo.isDo?1:0);
        msg.topic = createTopic(orgTag, com) ;
        msg.msg = "" ;
        return msg ;
    }
    private void setPubMsgBase(Command com, MqttPubMsgSdV1 msg){
        msg.commandId = com.id ;
        msg.deviceId = com.rtuAddr ;
        msg.mqttResultSendWebUrl = com.rtuResultSendWebUrl ;
    }
    private String createTopic(String orgTag, Command com){
        String topic = null ;
        switch (com.code){
            case CodeSdV1.cd_Fault:{
                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m4" ;
                break ;
            }
            case CodeSdV1.cd_Stir:{
                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m80" ;
                break ;
            }
            case CodeSdV1.cd_Inject:{
                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m1" ;
                break ;
            }
            case CodeSdV1.cd_Irr:{
                topic = orgTag + "/" + com.protocol + com.protocolVersion + "/" + com.rtuAddr + "/control/m2" ;
                break ;
            }
            default:{
                topic = null ;
                break;
            }
        }
        return topic ;
    }
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/downVos/ComCtrlVo.java
New file
@@ -0,0 +1,12 @@
package com.dy.common.mw.protocol4Mqtt.pSdV1.downVos;
/**
 * @Author: liurunyu
 * @Date: 2025/6/5 15:57
 * @Description
 */
public class ComCtrlVo {
    //是否控制动作,true是,false否
    //可以执行功能码 00,01,02,03的动作
    public boolean isDo;//
}
pipIrr-platform/pipIrr-common/src/main/java/com/dy/common/mw/protocol4Mqtt/pSdV1/˵Ã÷.txt
New file
@@ -0,0 +1,6 @@
山东泰安公司提供水肥机、土壤墒情站、气象站、FBox系统协议
建议消息主题规则:
子系统(机构)/协议名称(厂家)+版本号/设备编号/功能组/地址
例如:
ym/sd1/10000/control/m4  (元谋/山东+版本1/设备编号/设备控制/地址)
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/pom.xml
@@ -125,6 +125,18 @@
            <version>2.0.7</version>
        </dependency>
        <!-- MQTT消息中间件,连接池及客户端  -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
        <!--钉钉消息推送-->
        <dependency>
            <groupId>com.aliyun</groupId>
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/Server.java
@@ -4,13 +4,15 @@
import java.util.List;
import com.dy.common.util.ConfigProperties;
import com.dy.common.util.IPUtils;
import com.dy.rtuMw.server.*;
import com.dy.rtuMw.server.mqtt.MqttUnit;
import com.dy.rtuMw.server.mqtt.MqttUnitConfigVo;
import com.dy.rtuMw.server.msCenter.MsCenterConfigVo;
import com.dy.rtuMw.server.msCenter.MsCenterUnit;
import com.dy.rtuMw.server.rtuData.RtuDataUnit;
import com.dy.rtuMw.server.rtuData.RtuDataUnitConfigVo;
import com.dy.rtuMw.server.tasks.FromRtuComResultConstantTask;
import com.dy.rtuMw.server.tasks.FromRtuDataConstantTask;
import com.dy.rtuMw.server.tasks.*;
import com.dy.common.mw.UnitInterface;
import com.dy.common.mw.channel.tcp.TcpConfigVo;
import com.dy.common.mw.channel.tcp.TcpUnit;
@@ -20,8 +22,6 @@
import com.dy.common.mw.protocol.ProtocolUnit;
import com.dy.common.mw.support.SupportUnit;
import com.dy.common.mw.support.SupportUnitConfigVo;
import com.dy.rtuMw.server.tasks.SendMsConstantTask;
import com.dy.rtuMw.server.tasks.RtuDownConstantTask;
import com.dy.rtuMw.resource.ResourceUnit;
import com.dy.rtuMw.resource.ResourceUnitConfigVo;
import com.dy.common.springUtil.SpringContextUtil;
@@ -359,28 +359,31 @@
            //RTU远程升级模块
            UpgradeUnitConfigVo ugVo = new UpgradeUnitConfigVo();
            ugVo.enable = conf.getSetAttrBoolean(doc, "config.upgrade", "enable", null, null) ;
            ugVo.openNoUpgrade = conf.getSetAttrBoolean(doc, "config.upgrade", "openNoUpgrade", null, null) ;
            ugVo.lastOpenMaxGoOn = conf.getSetAttrPlusInt(doc, "config.upgrade", "lastOpenMaxGoOn", null, 5, 360000, null);
            ugVo.lastOpenMaxGoOn = ugVo.lastOpenMaxGoOn * 1000 ;//变成毫秒
            ugVo.noOneRtuUpgradeMaxDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "noOneRtuUpgradeMaxDuration", null, 5, 360000, null);
            ugVo.noOneRtuUpgradeMaxDuration = ugVo.noOneRtuUpgradeMaxDuration * 1000 ;//变成毫秒
            ugVo.runningAndIdleDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "runningAndIdleDuration", null, 5, 360000, null);
            ugVo.runningAndIdleDuration = ugVo.runningAndIdleDuration * 1000 ;//变成毫秒
            ugVo.failTryTimes = conf.getSetAttrPlusInt(doc, "config.upgrade", "failTryTimes", null, 0, 100, null);
            ugVo.ugMaxRtuAtOnce = conf.getSetAttrPlusInt(doc, "config.upgrade", "ugMaxRtuAtOnce", null, 0, 1000000, null);
            ugVo.rtuOffLineWaitDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "rtuOffLineWaitDuration", null, 1, 3600000, null);
            ugVo.rtuOffLineWaitDuration = ugVo.rtuOffLineWaitDuration * 1000;//变成毫秒
            ugVo.notifyStateInterval = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyStateInterval", null, 1, 300, null);
            ugVo.notifyStateInterval = ugVo.notifyStateInterval * 1000;//变成毫秒
            ugVo.notifyTimesAfterOver = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyTimesAfterOver", null, 0, null, null);
            ugVo.showStartInfo = showStartInfo ;
            AdapterImp_UpgradeUnit ugAdap = new AdapterImp_UpgradeUnit();
            ugAdap.setConfig(ugVo);
            UpgradeUnit ugUnit = UpgradeUnit.getInstance();
            ugUnit.setAdapter(ugAdap);
            ugUnit.start(obj -> {
            });
            units.add(ugUnit) ;
            if(ugVo.enable){
                ugVo.openNoUpgrade = conf.getSetAttrBoolean(doc, "config.upgrade", "openNoUpgrade", null, null) ;
                ugVo.lastOpenMaxGoOn = conf.getSetAttrPlusInt(doc, "config.upgrade", "lastOpenMaxGoOn", null, 5, 360000, null);
                ugVo.lastOpenMaxGoOn = ugVo.lastOpenMaxGoOn * 1000 ;//变成毫秒
                ugVo.noOneRtuUpgradeMaxDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "noOneRtuUpgradeMaxDuration", null, 5, 360000, null);
                ugVo.noOneRtuUpgradeMaxDuration = ugVo.noOneRtuUpgradeMaxDuration * 1000 ;//变成毫秒
                ugVo.runningAndIdleDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "runningAndIdleDuration", null, 5, 360000, null);
                ugVo.runningAndIdleDuration = ugVo.runningAndIdleDuration * 1000 ;//变成毫秒
                ugVo.failTryTimes = conf.getSetAttrPlusInt(doc, "config.upgrade", "failTryTimes", null, 0, 100, null);
                ugVo.ugMaxRtuAtOnce = conf.getSetAttrPlusInt(doc, "config.upgrade", "ugMaxRtuAtOnce", null, 0, 1000000, null);
                ugVo.rtuOffLineWaitDuration = conf.getSetAttrPlusInt(doc, "config.upgrade", "rtuOffLineWaitDuration", null, 1, 3600000, null);
                ugVo.rtuOffLineWaitDuration = ugVo.rtuOffLineWaitDuration * 1000;//变成毫秒
                ugVo.notifyStateInterval = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyStateInterval", null, 1, 300, null);
                ugVo.notifyStateInterval = ugVo.notifyStateInterval * 1000;//变成毫秒
                ugVo.notifyTimesAfterOver = conf.getSetAttrPlusInt(doc, "config.upgrade", "notifyTimesAfterOver", null, 0, null, null);
                ugVo.showStartInfo = showStartInfo ;
                AdapterImp_UpgradeUnit ugAdap = new AdapterImp_UpgradeUnit();
                ugAdap.setConfig(ugVo);
                UpgradeUnit ugUnit = UpgradeUnit.getInstance();
                ugUnit.setAdapter(ugAdap);
                ugUnit.start(obj -> {
                });
                units.add(ugUnit) ;
            }
            /////////////////
            //RTU上行数据处理模块(任务树)
@@ -410,6 +413,12 @@
            CoreUnit.addConstantTask(new RtuDownConstantTask());
            CoreUnit.addConstantTask(new FromRtuDataConstantTask());
            CoreUnit.addConstantTask(new FromRtuComResultConstantTask());
            Boolean enableMq = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ;
            if(enableMq != null && enableMq.booleanValue()){
                CoreUnit.addConstantTask(new MqttSubMessageConstantTask());
                CoreUnit.addConstantTask(new MqttPubMessageConstantTask());
                CoreUnit.addConstantTask(new MqttComResultConstantTask());
            }
            CoreUnit.addConstantTask(new SendMsConstantTask());
            coreUnit.start(obj -> {
            });
@@ -433,7 +442,70 @@
                });
                TcpSvUrl = "[ip]:" + tcpVo.port ;
                units.add(tcpUnit) ;
            }
            }
            /////////////////
            //MQTT模块
            MqttUnitConfigVo mqVo = new MqttUnitConfigVo();
            mqVo.enable = conf.getSetAttrBoolean(doc, "config.mqtt", "enable", null, null) ;
            ServerProperties.mqttUnitEnable = mqVo.enable ;
            if(mqVo.enable){
                mqVo.svIp = conf.getSetAttrTxt(doc, "config.mqtt", "svIp", null, true, null) ;
                if(!IPUtils.ipValid(mqVo.svIp)){
                    throw new Exception("config.mqtt.svIp配置的IP不合法") ;
                }
                mqVo.svPort = conf.getSetAttrPlusInt(doc, "config.mqtt", "svPort", null, 5, 360000, null);
                if(mqVo.svPort < 0 || mqVo.svPort > 65535){
                    throw new Exception("config.mqtt.svPort配置的端口不合法") ;
                }
                mqVo.svUserName = conf.getSetAttrTxt(doc, "config.mqtt", "svUserName", null, true, null) ;
                if(mqVo.svUserName == null || mqVo.svUserName.trim().equals("")){
                    throw new Exception("config.mqtt.svUserName配置的用户名不合法") ;
                }else{
                    mqVo.svUserName = mqVo.svUserName.trim() ;
                }
                mqVo.svUserPassword = conf.getSetAttrTxt(doc, "config.mqtt", "svUserPassword", null, true, null) ;
                if(mqVo.svUserPassword == null || mqVo.svUserPassword.trim().equals("")){
                    throw new Exception("config.mqtt.svUserName配置的用户密码不合法") ;
                }else{
                    mqVo.svUserPassword = mqVo.svUserPassword.trim() ;
                }
                mqVo.poolMaxSize = conf.getSetAttrPlusInt(doc, "config.mqtt", "poolMaxSize", null, 5, 360000, null);
                if(mqVo.poolMaxSize <= 1 || mqVo.poolMaxSize > 1000){
                    throw new Exception("config.mqtt.poolMaxSize配置的连接池连接最大数量不合法") ;
                }
                String topicAndQos = conf.getSetAttrTxt(doc, "config.mqtt", "topicAndQos", null, true, null) ;
                if(topicAndQos == null || topicAndQos.trim().equals("")){
                    throw new Exception("config.mqtt.topicAndQos配置的主题及Qos不合法") ;
                }else{
                    topicAndQos = topicAndQos.trim() ;
                    topicAndQos = topicAndQos.replaceAll(",", ",");
                    topicAndQos = topicAndQos.replaceAll(";", ";");
                    String[] topicAndQosArr = topicAndQos.split(";") ;
                    mqVo.subTopics = new String[topicAndQosArr.length] ;
                    mqVo.topicsQos = new int[topicAndQosArr.length] ;
                    int index = 0 ;
                    for(String topicAndQosStr : topicAndQosArr){
                        String[] tq = topicAndQosStr.split(",") ;
                        mqVo.subTopics[index] = tq[0].trim() ;
                        mqVo.topicsQos[index] = Integer.parseInt(tq[1].trim()) ;
                        index++ ;
                    }
                }
                mqVo.publishQos = conf.getSetAttrPlusInt(doc, "config.mqtt", "publishQos", null, 0, 3, null);
                if(mqVo.publishQos < 0 || mqVo.publishQos > 3){
                    throw new Exception("config.mqtt.publishQos配置不合法") ;
                }
                mqVo.showStartInfo = showStartInfo ;
                AdapterImp_MqttUnit mqAdapt = new AdapterImp_MqttUnit();
                mqAdapt.setConfig(mqVo);
                MqttUnit mqUnit = MqttUnit.getInstance();
                mqUnit.setAdapter(mqAdapt);
                mqUnit.start(obj -> {
                });
                units.add(mqUnit) ;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }        
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/ServerShutDownHook.java
@@ -20,13 +20,19 @@
            try{
                // ç¡®ä¿è¿™æ®µä»£ç å°½å¯èƒ½å¿«é€Ÿæ‰§è¡Œï¼Œé¿å…å½±å“JVM的关闭
                log.info("程序(控制台)关闭钩子类执行");
                Command com = new Command() ;
                com.id = Command.defaultId ;
                com.code = CodeLocal.stopTcpSv ;
                com.type = CommandType.innerCommand ;
                new CommandInnerDeaLer().deal(com) ;
                Command com1 = new Command() ;
                com1.id = Command.defaultId ;
                com1.code = CodeLocal.stopTcpSv ;
                com1.type = CommandType.innerCommand ;
                new CommandInnerDeaLer().deal(com1) ;
                //Thread.sleep(100L);//实测不执行
                log.info("关闭程序前,关闭了TCP服务");
                Command com2 = new Command() ;
                com2.id = Command.defaultId ;
                com2.code = CodeLocal.stopMqttSv ;
                com2.type = CommandType.innerCommand ;
                new CommandInnerDeaLer().deal(com2) ;
            }catch (Exception e){
                log.error("程序(控制台)关闭钩子发生异常", e);
            }
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/AdapterImp_MqttUnit.java
New file
@@ -0,0 +1,19 @@
package com.dy.rtuMw.server;
import com.dy.rtuMw.server.mqtt.MqttUnitAdapter;
import com.dy.rtuMw.server.mqtt.MqttUnitConfigVo;
public class AdapterImp_MqttUnit implements MqttUnitAdapter {
    private MqttUnitConfigVo configVo ;
    public MqttUnitConfigVo getConfig() {
        return configVo;
    }
    public void setConfig(MqttUnitConfigVo configVo){
        this.configVo = configVo ;
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/ServerProperties.java
@@ -54,4 +54,7 @@
    //有报警发生时,向钉钉发送消息的间隔时长(分钟)
    public static Integer sendDingDingAlarmMsInterval = 60 ;
    //Mqtt模块是否启动
    public static Boolean mqttUnitEnable = false ;
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/CommandInnerDeaLer.java
@@ -5,6 +5,7 @@
import com.dy.common.mw.protocol.Command;
import com.dy.common.mw.protocol.rtuState.RtuStatus;
import com.dy.rtuMw.server.local.localProtocol.*;
import com.dy.rtuMw.server.mqtt.MqttUnit;
import java.util.HashMap;
import java.util.Map;
@@ -43,6 +44,8 @@
            return this.stopTcpSv(com) ;
        }else if(code.equals(CodeLocal.recoverTcpSv)){
            return this.recoverTcpSv(com) ;
        }else if(code.equals(CodeLocal.recoverMqttSv)){
            return this.stopMqttSv(com) ;
        }else if(code.equals(CodeLocal.mwState)){
            return this.mwInfo(com) ;
        }
@@ -157,6 +160,30 @@
        return ReturnCommand.successed("已经启动恢复TCP服务", command.getId(), command.getCode(), null) ;
    }
    /**
     * åœæ­¢TCP服务,不再接入新的TCP连接,已经TCP连接的全部断连接
     * @throws Exception
     */
    private Command stopMqttSv(Command command) throws Exception{
        MqttUnit.getInstance().stop(new UnitCallbackInterface(){
            public void call(Object obj) throws Exception {
            }
        });
        return ReturnCommand.successed("已经启动停止Mqtt服务", command.getId(), command.getCode(), null) ;
    }
    /**
     * æ¢å¤TCP服务,接入新的TCP连接
     * @throws Exception
     */
    private Command recoverMqttSv(Command command) throws Exception{
        MqttUnit.getInstance().recover(new UnitCallbackInterface(){
            public void call(Object obj) throws Exception {
            }
        });
        return ReturnCommand.successed("已经启动恢复Mqtt服务", command.getId(), command.getCode(), null) ;
    }
    /**
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/local/localProtocol/CodeLocal.java
@@ -22,6 +22,10 @@
    public static final String recoverTcpSv = "LCD0112" ;//重启TCP服务,接入新的TCP连接
    public static final String stopMqttSv = "LCD0114" ;//停止Mqtt服务
    public static final String recoverMqttSv = "LCD0116" ;//重启Mqtt服务
    public static final String mwState = "LCD0200" ;//得到通信中间件运行信息
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultCache.java
New file
@@ -0,0 +1,69 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.queue.Node;
import com.dy.common.queue.Queue;
import com.dy.rtuMw.server.ServerProperties;
/**
 * @Author: liurunyu
 * @Date: 2025/6/5 15:05
 * @Description
 */
public class MqttComResultCache {
    //TCP下行命令缓存队列
    private static Queue cacheQueue = new Queue("MqttComResultCache") ;
    private static MqttComResultCache instance = new MqttComResultCache() ;
    private MqttComResultCache(){
        cacheQueue.setLimit(ServerProperties.cacheUpDownDataWarnCount, ServerProperties.cacheUpDownDataMaxCount);
    }
    public static MqttComResultCache getInstance(){
        return instance ;
    }
    /**
     * ç¼“存节点
     * @param node node
     * @throws Exception å¼‚常
     */
    public static void cacheMqttComResult(MqttComResultNode node) throws Exception{
        if(node != null && node.obj != null){
            cacheQueue.pushHead(node);
        }
    }
    /**
     * å¾—到第一个节点
     * @return Node
     */
    public static Node getFirstQueueNode(){
        return cacheQueue.getFirstNode() ;
    }
    /**
     * å¾—到最后一个节点
     * @return Node
     */
    public static Node getLastQueueNode(){
        return cacheQueue.getLastNode() ;
    }
    /**
     * ç§»é™¤èŠ‚ç‚¹
     * @param node
     */
    public static void removeNode(Node node){
        cacheQueue.remove(node);
    }
    /**
     * ç¼“存的节点数
     * @Return ç¼“存节点数
     */
    public static Integer size(){
        return cacheQueue.size() ;
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttComResultNode.java
New file
@@ -0,0 +1,58 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
import com.dy.common.queue.NodeObj;
import com.dy.common.springUtil.SpringContextUtil;
import com.dy.common.threadPool.ThreadPool;
import com.dy.common.threadPool.TreadPoolFactory;
import com.dy.rtuMw.web.comResult.CommandResultDeal;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
 * @Author: liurunyu
 * @Date: 2025/6/5 15:06
 * @Description
 */
public class MqttComResultNode implements NodeObj {
    private static final Logger log = LogManager.getLogger(MqttComResultNode.class.getName());
    public Object obj ;//数据
    public MqttComResultNode(Object obj){
        this.obj = obj ;
    }
    /**
     * è‡ªå·±å¤„理自己
     * @return
     */
    public boolean dealSelf(){
        try {
            ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolLong() ;
            pool.putJob(new ThreadPool.Job() {
                public void execute() {
                    if(obj != null){
                        if(obj instanceof MqttSubMsg){
                            CommandResultDeal deal = SpringContextUtil.getBean(CommandResultDeal.class) ;
                            deal.deal((MqttSubMsg)obj);
                        }
                    }
                }
                @Override
                public void destroy(){
                }
                @Override
                public boolean isDestroy(){
                    return false ;
                }
            });
        } catch (Exception e) {
            log.error("在RtuComResultNode内发生异常", e);
        }
        return true ;
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
New file
@@ -0,0 +1,86 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.mw.channel.mqtt.MqttClientPool;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttClient;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 14:54
 * @Description
 */
public class MqttManager {
    private static final Logger log = LogManager.getLogger(MqttManager.class.getName());
    private static final MqttManager INSTANCE = new MqttManager();
    private MqttUnitConfigVo configVo ;
    private MqttClientPool pool;
    private MqttManager(){
    }
    public static MqttManager getInstance() {
        return MqttManager.INSTANCE;
    }
    /**
     *  åˆå§‹åŒ–配置信息
     */
    public void initOption(MqttUnitConfigVo configVo) {
        this.configVo = configVo;
    }
    public void start()throws Exception{
        String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort;
        this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize);
        if(this.pool.isClose()){
            throw new Exception("Mqtt连接池初始化失败");
        }
        MqttClient clientSub = null ;
        try {
            clientSub = pool.popClient();//新创建一个Client时,此Client实际去连接MQTT服务器,如果连接不上,就会抛出异常
        }catch (Exception e){
            throw new Exception("Mqtt连接池获得连接异常", e);
        }
        if(clientSub == null || !clientSub.isConnected()){
            throw new Exception("Mqtt连接池获得订阅连接不可用");
        }
        for(int i = 0; i < this.configVo.subTopics.length; i++){
            clientSub.subscribe(this.configVo.subTopics[i], this.configVo.topicsQos[i], new MqttMessageListener());
        }
    }
    public void stop()throws Exception{
        if(this.pool != null){
            // å…³é—­è¿žæŽ¥æ± 
            this.pool.close();
        }
    }
    public MqttClient popMqttClient() throws Exception{
        return this.pool.popClient();
    }
    public void pushMqttClient(MqttClient client) {
        this.pool.pushClient(client);
    }
    public void publishMsg(MqttClient client, String topic, byte[] msg) throws Exception{
        client.publish(topic, msg, this.configVo.publishQos, false);
    }
    public void publishMsg(MqttClient client, String topic, String msg) throws Exception{
        byte[] bs = msg.getBytes("UTF-8") ;
        client.publish(topic, bs, this.configVo.publishQos, false);
    }
    public boolean poolIsClose(){
        if(this.pool == null){
            return true;
        }
        return this.pool.isClose();
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
New file
@@ -0,0 +1,49 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.mw.protocol4Mqtt.MqttMsgParser;
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
import com.dy.common.util.Callback;
import org.eclipse.paho.client.mqttv3.* ;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 15:52
 * @Description
 */
public class MqttMessageListener implements IMqttMessageListener{
    @Override
    public void messageArrived(String topic, MqttMessage msg) throws Exception {
        MqttMsgParser parser = new MqttMsgParser() ;
        MqttSubMsg subMsg = parser.parseSubMsg(topic, msg) ;
        this.nextDeal(subMsg);
    }
    private void nextDeal(MqttSubMsg subMsg)throws Exception {
        subMsg.action(new Callback() {
            @Override
            public void call(Object obj) {
                MqttSubMsg subMs = (MqttSubMsg) obj ;
                MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ;
                if(pubMs != null){
                    subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ;
                    subMs.commandId = pubMs.commandId ;
                    try {
                        MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                try{
                    MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg));
                }catch (Exception e){
                }
            }
            @Override
            public void call(Object... objs) {
            }
            @Override
            public void exception(Exception e) {
            }
        });
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgCache.java
New file
@@ -0,0 +1,132 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
import com.dy.common.queue.Node;
import com.dy.common.queue.Queue;
import com.dy.rtuMw.server.ServerProperties;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 17:24
 * @Description
 */
public class MqttPubMsgCache {
    //TCP下行命令缓存队列
    private static Queue cacheQueue = new Queue("mqttPubMsgCache") ;
    private static MqttPubMsgCache instance = new MqttPubMsgCache() ;
    private MqttPubMsgCache(){
        cacheQueue.setLimit(ServerProperties.cacheUpDownDataWarnCount, ServerProperties.cacheUpDownDataMaxCount);
    }
    public static MqttPubMsgCache getInstance(){
        return instance ;
    }
    public static Integer info(){
        Integer comTotalDown = 0 ;//缓存的下行命令总数
        MqttPubMsgNode obj ;
        Node node = cacheQueue.getFirstNode() ;
        while(node != null && node.obj != null){
            obj = (MqttPubMsgNode)node.obj;
            if(!obj.onceReceivedResult){
                comTotalDown ++ ;
            }
        }
        return comTotalDown ;
    }
    /**
     * ç¼“存命令
     * @param result
     * @throws Exception
     */
    public static void cacheCommand(MqttPubMsg result) throws Exception{
        if(result != null){
            cacheQueue.pushHead(new MqttPubMsgNode(result));
        }
    }
    /**
     * åŒ¹é…å‘½ä»¤ç»“æžœ
     * @param subMsg
     * @return
     */
    public static MqttPubMsg matchFromHead(MqttSubMsg subMsg){
        MqttPubMsg pubMsg = null ;
        MqttPubMsgNode obj = null ;
        Node node = cacheQueue.getFirstNode() ;
        while(node != null && node.obj != null){
            obj = (MqttPubMsgNode)node.obj;
            pubMsg = obj.result ;
            if(pubMsg != null
                    && subMsg.subMsgMatchPubMsg(pubMsg)){
                obj.onceReceivedResult = true ;//标识已经收到命令结果
                return pubMsg;
            }else{
                node = node.next ;
            }
        }
        return null ;
    }
    /**
     * åŒ¹é…å‘½ä»¤ç»“æžœ
     * @param subMsg
     * @return
     */
    public static MqttPubMsg matchFromTail(MqttSubMsg subMsg){
        MqttPubMsg pubMsg = null ;
        MqttPubMsgNode obj = null ;
        Node node = cacheQueue.getLastNode() ;
        while(node != null && node.obj != null){
            obj = (MqttPubMsgNode)node.obj;
            pubMsg = obj.result ;
            if(pubMsg != null
                    && subMsg.subMsgMatchPubMsg(pubMsg)){
                obj.onceReceivedResult = true ;//标识已经收到命令结果
                return pubMsg;
            }else{
                node = node.pre ;
            }
        }
        return null ;
    }
    /**
     * å¾—到第一个节点
     * @return
     */
    public static Node getFirstQueueNode(){
        return cacheQueue.getFirstNode() ;
    }
    /**
     * å¾—到最后一个节点
     * @return
     */
    public static Node getLastQueueNode(){
        return cacheQueue.getLastNode() ;
    }
    /**
     * ç§»é™¤èŠ‚ç‚¹
     * @param node
     */
    public static void removeNode(Node node){
        cacheQueue.remove(node);
    }
    /**
     * ç¼“存的节点数
     * @Return ç¼“存节点数
     */
    public static Integer size(){
        return cacheQueue.size() ;
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttPubMsgNode.java
New file
@@ -0,0 +1,95 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
import com.dy.common.queue.NodeObj;
import com.dy.rtuMw.server.ServerProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttClient;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 17:25
 * @Description
 */
public class MqttPubMsgNode implements NodeObj {
    private static Logger log = LogManager.getLogger(MqttPubMsgNode.class.getName());
    public MqttPubMsg result ;//下行命令
    public Long cachTime ;//缓存时刻
    public boolean onceReceivedResult ;//已经收到命令应答
    public MqttPubMsgNode(MqttPubMsg result){
        this.result = result ;
        this.cachTime = System.currentTimeMillis() ;
        this.onceReceivedResult = false ;
    }
    /**
     * è‡ªå·±å¤„理自己
     * @param now
     * @return
     */
    public boolean dealSelf(Long now){
        if(this.onceReceivedResult){
            //已经收到命令结果
            //记录状态
            //RtuStatusDealer.commandSuccess(this.result.rtuAddr, this.result.downCode, this.result.downCodeName);
            return true ;
        }
        boolean noConnect2MqSv = false ;
        MqttManager mqttManager = MqttManager.getInstance() ;
        MqttClient mqttClient = null ;
        noConnect2MqSv = mqttManager.poolIsClose() ;
        if(noConnect2MqSv){
            //未曾连接MQTT服务器
            return this.decideRemoveNodeFromCach(now) ;
        }else{
            try {
                //如果网络不好或断网,此处用时较长
                mqttClient = mqttManager.popMqttClient() ;
                if(mqttClient == null || !mqttClient.isConnected()){
                    noConnect2MqSv = false ;
                }
            }catch (Exception e){
                log.error("获取MQTT客户端失败", e);
            }
        }
        if(noConnect2MqSv){
            //未曾连接MQTT服务器
            return this.decideRemoveNodeFromCach(now) ;
        }else{
            if(mqttClient != null && mqttClient.isConnected()){
                try {
                    mqttManager.publishMsg(mqttClient, this.result.topic, this.result.msg);
                    log.info("发布MQTT消息(主题=" + this.result.topic + ")" + this.result.msg);
                }catch (Exception e){
                    log.error("MQTT发布消息失败(主题=" + this.result.topic + ")" , e);
                }finally {
                    mqttManager.pushMqttClient(mqttClient);
                }
                return false ;
            }else{
                //未曾连接MQTT服务器
                return this.decideRemoveNodeFromCach(now) ;
            }
        }
    }
    private boolean decideRemoveNodeFromCach(Long now){
        if(!this.result.isCacheForOffLine){
            //不在线命令不缓存
            return true ;
        }else{
            //不在线命令缓存
            if(now - this.cachTime >= ServerProperties.offLineCacheTimeout){
                //缓存超时
                return true ;
            }
        }
        return false ;
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgCache.java
New file
@@ -0,0 +1,68 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
import com.dy.common.queue.Node;
import com.dy.common.queue.Queue;
import com.dy.rtuMw.server.ServerProperties;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 16:13
 * @Description
 */
public class MqttSubMsgCache {
    //TCP下行命令缓存队列
    private static Queue cacheQueue = new Queue("mqttSubMsgCache") ;
    private static MqttSubMsgCache instance = new MqttSubMsgCache() ;
    private MqttSubMsgCache(){
        cacheQueue.setLimit(ServerProperties.cacheUpDownDataWarnCount, ServerProperties.cacheUpDownDataMaxCount);
    }
    public static MqttSubMsgCache getInstance(){
        return instance ;
    }
    /**
     * ç¼“存订阅的消息
     * @param node æ”¶åˆ°çš„æ¶ˆæ¯
     * @throws Exception
     */
    public static void cacheMsg(MqttSubMsgNode node) throws Exception{
        cacheQueue.pushHead(node);
    }
    /**
     * å¾—到第一个节点
     * @return
     */
    public static Node getFirstQueueNode(){
        return cacheQueue.getFirstNode() ;
    }
    /**
     * å¾—到最后一个节点
     * @return
     */
    public static Node getLastQueueNode(){
        // è°ƒç”¨cacheQueue的getLastNode方法,返回最后一个节点
        return cacheQueue.getLastNode() ;
    }
    /**
     * ç§»é™¤èŠ‚ç‚¹
     * @param node
     */
    public static void removeNode(Node node){
        cacheQueue.remove(node);
    }
    /**
     * ç¼“存的节点数
     * @Return ç¼“存节点数
     */
    public static Integer size(){
        return cacheQueue.size() ;
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttSubMsgNode.java
New file
@@ -0,0 +1,69 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
import com.dy.common.queue.NodeObj;
import com.dy.common.threadPool.ThreadPool;
import com.dy.common.threadPool.TreadPoolFactory;
import com.dy.rtuMw.server.rtuData.TaskPool;
import com.dy.rtuMw.server.rtuData.TaskSurpport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 16:14
 * @Description
 */
public class MqttSubMsgNode implements NodeObj {
    private static Logger log = LogManager.getLogger(MqttSubMsgNode.class.getName());
    protected MqttSubMsg msg;
    public MqttSubMsgNode(MqttSubMsg obj){
        this.msg = obj ;
    }
    /**
     * è‡ªå·±å¤„理自己
     * @return
     */
    public boolean dealSelf(){
        try {
            ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolLong() ;
            pool.putJob(new ThreadPool.Job() {
                public void execute() {
                    if(msg != null && msg.valid()){
                        TaskSurpport task = null ;
                        try{
                            task = TaskPool.popTask() ;
                            if(task != null){
                                task.execute(msg);
                            }else{
                                log.error("未得到Mq订阅消息处理任务!");
                            }
                        }catch(Exception e){
                            log.error("Mq订阅消息任务池处理数据时发生异常", e);
                        }finally {
                            if(task != null){
                                TaskPool.freeAndCleanTask(task);
                            }
                        }
                    }
                }
                @Override
                public void destroy(){
                }
                @Override
                public boolean isDestroy(){
                    return false ;
                }
            });
        } catch (Exception e) {
            log.error("在MqttSubMsgObj内发生异常", e);
        }
        return true ;
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnit.java
New file
@@ -0,0 +1,68 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.mw.UnitAdapterInterface;
import com.dy.common.mw.UnitCallbackInterface;
import com.dy.common.mw.UnitInterface;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 14:46
 * @Description
 */
public class MqttUnit  implements UnitInterface {
    private static MqttUnit instance = new MqttUnit() ;
    public static MqttUnitAdapter adapter ;
    public static MqttUnitConfigVo confVo ;
    private static MqttManager manager ;
    private MqttUnit(){} ;
    public static MqttUnit getInstance(){
        return instance ;
    }
    @Override
    public void setAdapter(UnitAdapterInterface adapter) throws Exception {
        if(adapter == null){
            throw new Exception("Mqtt消息模块适配器对象不能为空!") ;
        }
        MqttUnit.adapter = (MqttUnitAdapter)adapter ;
        MqttUnit.confVo = MqttUnit.adapter.getConfig() ;
        if(MqttUnit.confVo == null){
            throw new Exception("Mqtt消息模块配置对象不能为空!") ;
        }
    }
    /**
     * åˆå§‹åŒ–
     */
    @Override
    public void start(UnitCallbackInterface callback) throws Exception {
        if(confVo.enable){
            manager = MqttManager.getInstance() ;
            manager.initOption(confVo);
            manager.start();
            callback.call(null) ;
            System.out.println("Mqtt消息模块成功启动");
        }else{
            System.out.println("Mqtt消息模块配置不启动");
        }
    }
    @Override
    public void stop(UnitCallbackInterface callback) throws Exception {
        if(manager != null){
            manager.stop();
        }
        callback.call(null) ;
    }
    public void recover(UnitCallbackInterface callback) throws Exception {
        this.start(callback) ;
        callback.call(null) ;
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitAdapter.java
New file
@@ -0,0 +1,14 @@
package com.dy.rtuMw.server.mqtt;
import com.dy.common.mw.UnitAdapterInterface;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 14:47
 * @Description
 */
public interface MqttUnitAdapter extends UnitAdapterInterface {
    MqttUnitConfigVo getConfig();
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttUnitConfigVo.java
New file
@@ -0,0 +1,29 @@
package com.dy.rtuMw.server.mqtt;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 14:46
 * @Description
 */
public class MqttUnitConfigVo {
    public Boolean showStartInfo ;
    public Boolean enable ;//模块是否启动
    public String svIp ;//
    public Integer svPort ;//
    public String svUserName ;//
    public String svUserPassword ;//
    public Integer poolMaxSize ;//
    public String[] subTopics ;//订阅的主题
    public int[] topicsQos ;////订阅主题的Qos
    public int publishQos ;////发布消息的Qos
    public MqttUnitConfigVo(){
        this.enable = false ;
        this.svIp = "127.0.0.1" ;
        this.svPort = 1883 ;
        this.svUserName = "dyyjy" ;
        this.svUserPassword = "Dyyjy2025,;.abc!@#" ;
        this.poolMaxSize = 10 ;
        this.publishQos = 1 ;
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkMqttData.java
New file
@@ -0,0 +1,37 @@
package com.dy.rtuMw.server.rtuData;
import com.dy.common.mw.protocol4Mqtt.pSdV1.MqttSubMsgSdV1;
import com.dy.rtuMw.server.rtuData.pSdV1.TkFindPSdV1;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 17:05
 * @Description
 */
public class TkMqttData extends TaskSurpport {
    private static Logger log = LogManager.getLogger(TkMqttData.class.getName()) ;
    //ç±»ID,一定与Tree.xml配置文件中配置一致
    public static final String taskId = "TkMqttData" ;
    /**
     * æ‰§è¡ŒèŠ‚ç‚¹ä»»åŠ¡
     * @param data éœ€è¦å¤„理的数据
     */
    @Override
    public void execute(Object data) {
        if(data == null){
            log.error("严重错误,Mqtt订阅消息数据为空!" );
        }else{
            if(data instanceof MqttSubMsgSdV1){
                this.toNextOneTask(data, TkFindPSdV1.taskId);
            }else{
                log.error("严重错误,该数据类型(" + data.getClass().getName() + "),接收数据任务还未实现!" );
            }
        }
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/TkReceive.java
@@ -1,5 +1,6 @@
package com.dy.rtuMw.server.rtuData;
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
import com.dy.common.mw.protocol.Data;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -21,7 +22,9 @@
            log.error("严重错误,RTU上行数据为空!" );
        }else{
            if(data instanceof Data){
                this.toNextTasks(data);
                this.toNextOneTask(data, TkRtuData.taskId);
            }else if(data instanceof MqttSubMsg){
                this.toNextOneTask(data, TkMqttData.taskId);
            }else{
                log.error("严重错误,该数据类型(" + data.getClass().getName() + "),接收数据任务还未实现!" );
            }
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/rtuData/pSdV1/TkFindPSdV1.java
New file
@@ -0,0 +1,31 @@
package com.dy.rtuMw.server.rtuData.pSdV1;
import com.dy.common.mw.protocol4Mqtt.pSdV1.MqttSubMsgSdV1;
import com.dy.rtuMw.server.rtuData.TaskSurpport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
 * @Author: liurunyu
 * @Date: 2025/6/5 13:43
 * @Description
 */
public class TkFindPSdV1  extends TaskSurpport {
    private static Logger log = LogManager.getLogger(TkFindPSdV1.class.getName()) ;
    //ç±»ID,一定与Tree.xml配置文件中配置一致
    public static final String taskId = "TkFindPSdV1" ;
    /**
     * æ‰§è¡ŒèŠ‚ç‚¹ä»»åŠ¡
     * @param data éœ€è¦å¤„理的数据
     */
    @Override
    public void execute(Object data) {
        //前面的任务已经判断了data不为空
        MqttSubMsgSdV1 msg = (MqttSubMsgSdV1)data ;
        log.info(msg.toString());
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttComResultConstantTask.java
New file
@@ -0,0 +1,75 @@
package com.dy.rtuMw.server.tasks;
import com.dy.common.mw.core.CoreTask;
import com.dy.common.queue.Node;
import com.dy.rtuMw.server.mqtt.MqttComResultCache;
import com.dy.rtuMw.server.mqtt.MqttComResultNode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
 * å¯¹RTU上行数据进行业务处理
 */
public class MqttComResultConstantTask extends CoreTask {
    private static final Logger log = LogManager.getLogger(MqttComResultConstantTask.class.getName());
    /**
     * åœ¨å•线程环境中运行
     */
    @Override
    public Integer execute() {
        try{
            dealRtuComResult() ;
        }catch(Exception e){
            log.error(e);
        }
        return MqttComResultCache.size()>0?0:1 ;
    }
    /**
     * å¤„理上行命令结果
     */
    public void dealRtuComResult() {
        Node first = MqttComResultCache.getFirstQueueNode() ;
        if(first != null){
            Node last = MqttComResultCache.getLastQueueNode() ;
            while (last != null){
                last = this.doDealRtuComResult(first, last);
            }
        }
    }
    /**
     * å¤„理缓存的上行数据节点
     * @param first ç¬¬ä¸€ä¸ªèŠ‚ç‚¹
     * @param last æœ€åŽä¸€ä¸ªèŠ‚ç‚¹
     */
    private Node doDealRtuComResult(Node first, Node last){
        if(last != null){
            //在dealNode方法中,可能要把last从队列中移除,这时last.pre为空,所以提前把last.pre取出来
            Node pre = last.pre ;
            dealNode(last) ;
            if(first != last){
                return pre ;
            }else{
                //停止
                return null ;
            }
        }else{
            return null ;
        }
    }
    /**
     * å¤„理一个节点
     * @param node èŠ‚ç‚¹
     */
    private void dealNode(Node node){
        if(node != null && node.obj != null){
            MqttComResultNode obj = (MqttComResultNode)node.obj ;
            obj.dealSelf() ;
            MqttComResultCache.removeNode(node);
        }
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttPubMessageConstantTask.java
New file
@@ -0,0 +1,81 @@
package com.dy.rtuMw.server.tasks;
import com.dy.common.mw.core.CoreTask;
import com.dy.common.queue.Node;
import com.dy.rtuMw.server.mqtt.MqttPubMsgCache;
import com.dy.rtuMw.server.mqtt.MqttPubMsgNode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 16:25
 * @Description
 */
public class MqttPubMessageConstantTask extends CoreTask {
    private static final Logger log = LogManager.getLogger(MqttPubMessageConstantTask.class.getName());
    /**
     * åœ¨å•线程环境中运行
     */
    @Override
    public Integer execute() {
        try{
            dealMqMsg() ;
        }catch(Exception e){
            log.error(e);
        }
        return MqttPubMsgCache.size()>0?0:1 ;
    }
    /**
     * å¤„理MQTT订阅的消息
     */
    public void dealMqMsg() {
        Node first = MqttPubMsgCache.getFirstQueueNode() ;
        if(first != null){
            Node last = MqttPubMsgCache.getLastQueueNode() ;
            while (last != null){
                last = this.doDealMqMsg(System.currentTimeMillis(), first, last);
            }
        }
    }
    /**
     * å¤„理缓存的上行数据节点
     * @param now å½“前时刻
     * @param first ç¬¬ä¸€ä¸ªèŠ‚ç‚¹
     * @param last æœ€åŽä¸€ä¸ªèŠ‚ç‚¹
     */
    private Node doDealMqMsg(Long now, Node first, Node last){
        if(last != null){
            //在dealNode方法中,可能要把last从队列中移除,这时last.pre为空,所以提前把last.pre取出来
            Node pre = last.pre ;
            dealNode(now, last) ;
            if(first != last){
                return pre ;
            }else{
                //停止
                return null ;
            }
        }else{
            return null ;
        }
    }
    /**
     * å¤„理一个节点
     * @param now çŽ°åœ¨æ—¶åˆ»
     * @param node èŠ‚ç‚¹
     */
    private void dealNode(Long now, Node node){
        if(node != null && node.obj != null){
            MqttPubMsgNode obj = (MqttPubMsgNode)node.obj ;
            boolean removeNode = obj.dealSelf(now) ;
            if(removeNode){
                MqttPubMsgCache.removeNode(node);
            }
        }
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/MqttSubMessageConstantTask.java
New file
@@ -0,0 +1,77 @@
package com.dy.rtuMw.server.tasks;
import com.dy.common.mw.core.CoreTask;
import com.dy.common.queue.Node;
import com.dy.rtuMw.server.mqtt.MqttSubMsgCache;
import com.dy.rtuMw.server.mqtt.MqttSubMsgNode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 16:25
 * @Description
 */
public class MqttSubMessageConstantTask extends CoreTask {
    private static final Logger log = LogManager.getLogger(MqttSubMessageConstantTask.class.getName());
    /**
     * åœ¨å•线程环境中运行
     */
    @Override
    public Integer execute() {
        try{
            dealMqMsg() ;
        }catch(Exception e){
            log.error(e);
        }
        return MqttSubMsgCache.size()>0?0:1 ;
    }
    /**
     * å¤„理MQTT订阅的消息
     */
    public void dealMqMsg() {
        Node first = MqttSubMsgCache.getFirstQueueNode() ;
        if(first != null){
            Node last = MqttSubMsgCache.getLastQueueNode() ;
            while (last != null){
                last = this.doDealMqMsg(first, last);
            }
        }
    }
    /**
     * å¤„理缓存的上行数据节点
     * @param first ç¬¬ä¸€ä¸ªèŠ‚ç‚¹
     * @param last æœ€åŽä¸€ä¸ªèŠ‚ç‚¹
     */
    private Node doDealMqMsg(Node first, Node last){
        if(last != null){
            //在dealNode方法中,可能要把last从队列中移除,这时last.pre为空,所以提前把last.pre取出来
            Node pre = last.pre ;
            dealNode(last) ;
            if(first != last){
                return pre ;
            }else{
                //停止
                return null ;
            }
        }else{
            return null ;
        }
    }
    /**
     * å¤„理一个节点
     * @param node èŠ‚ç‚¹
     */
    private void dealNode(Node node){
        if(node != null && node.obj != null){
            MqttSubMsgNode obj = (MqttSubMsgNode)node.obj ;
            obj.dealSelf() ;
            MqttSubMsgCache.removeNode(node);
        }
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/tasks/WebDownCom4MqttTask.java
New file
@@ -0,0 +1,51 @@
package com.dy.rtuMw.server.tasks;
import com.dy.common.mw.core.CoreTask;
import com.dy.common.mw.protocol.Command;
import com.dy.common.mw.protocol.Driver;
import com.dy.common.mw.protocol.MidResult;
import com.dy.common.mw.protocol.ProtocolCache;
import com.dy.common.mw.protocol4Mqtt.MqttMsgParser;
import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
import com.dy.rtuMw.server.ServerProperties;
import com.dy.rtuMw.server.forTcp.TcpSessionCache;
import com.dy.rtuMw.server.mqtt.MqttPubMsgCache;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
 * @Author: liurunyu
 * @Date: 2025/6/5 15:42
 * @Description
 */
public class WebDownCom4MqttTask extends CoreTask {
    private static Logger log = LogManager.getLogger(WebDownCom4MqttTask.class.getName());
    @Override
    public Integer execute() {
        Command com = (Command)this.data ;
        try {
            log.info("下发MQTT命令" + com.getCode() + "的核心任务开始执行");
            this.deal(com);
        } catch (Exception e) {
            log.error("处理下行MQTT命令出错" + (e.getMessage()==null?"!":("," + e.getMessage())) ,e);
        }
        return null ;
    }
    /**
     * å¤„理命令
     * @param com å‘½ä»¤
     * @throws Exception
     */
    private void deal(Command com) throws Exception{
        MqttMsgParser parser = new MqttMsgParser() ;
        MqttPubMsg pubMsg = parser.createPubMsg(ServerProperties.orgTag, com) ;
        if(pubMsg != null){
            MqttPubMsgCache.cacheCommand(pubMsg);
        }
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/com/CommandCtrl.java
@@ -7,7 +7,9 @@
import com.dy.rtuMw.server.forTcp.TcpSessionCache;
import com.dy.rtuMw.server.local.CommandInnerDeaLer;
import com.dy.rtuMw.server.local.ReturnCommand;
import com.dy.rtuMw.server.mqtt.MqttManager;
import com.dy.rtuMw.server.msCenter.MsCenterUnit;
import com.dy.rtuMw.server.tasks.WebDownCom4MqttTask;
import com.dy.rtuMw.server.tasks.WebDownComTask;
import com.dy.common.mw.core.CoreUnit;
import com.dy.common.mw.protocol.Command;
@@ -247,6 +249,13 @@
            }catch(Exception e){
                return BaseResponseUtils.buildError(ReturnCommand.errored("处理发向RTU的外部透传命令出错" + (e.getMessage() == null?"":("," + e.getMessage())), com.getId(), com.getCode()) );
            }
        }else if(commandType.equals(CommandType.mqttCommand)){
            //发向MQTT的外部命令
            try{
                return this.dealMqttCommand(com) ;
            }catch(Exception e){
                return BaseResponseUtils.buildError(ReturnCommand.errored("处理发向RTU的外部命令出错" + (e.getMessage() == null?"":("," + e.getMessage())), com.getId(), com.getCode()) );
            }
        }else if(commandType.equals(CommandType.resultCommand)){
            return BaseResponseUtils.buildError(ReturnCommand.errored("出错,通信中间件不接结果类型的命令!", com.getId(), com.getCode()));
        }else{
@@ -336,4 +345,35 @@
        return BaseResponseUtils.buildSuccess(ReturnCommand.successed("透传命令已接受,即将构造并下发命令。", command.getId(), command.getCode()));
    }
    /**
     * å¤„理发向RTU的外部命令
     * @return ç»“æžœ
     */
    private BaseResponse<Command> dealMqttCommand(Command command){
        String rtuAddr = command.getRtuAddr() ;
        if(rtuAddr == null || rtuAddr.trim().equals("")){
            return BaseResponseUtils.buildError(ReturnCommand.errored("出错,设备ID为空!", command.getId(), command.getCode())) ;
        }
        if(!ServerProperties.mqttUnitEnable.booleanValue()){
            return BaseResponseUtils.buildError(ReturnCommand.errored("出错,MQTT连接模块配置未启动!", command.getId(), command.getCode())) ;
        }
        if(MqttManager.getInstance().poolIsClose()){
            return BaseResponseUtils.buildError(ReturnCommand.errored("出错,MQTT连接池水创建成功!", command.getId(), command.getCode())) ;
        }
        //生成异步任务
        WebDownCom4MqttTask task = new WebDownCom4MqttTask() ;
        task.data = command ;
        try{
            log.info("构造下发MQTT命令" + command.getCode() + "的核心任务,并放入任务队列中");
            CoreUnit.getInstance().pushCoreTask(task);
        }catch(Exception e){
            log.error(e.getMessage(), e);
            return BaseResponseUtils.buildError(ReturnCommand.successed("MQTT命令处理失败" + e.getMessage(), command.getId(), command.getCode())) ;
        }
        return BaseResponseUtils.buildSuccess(ReturnCommand.successed("MQTT命令已接受,即将构造并下发命令。", command.getId(), command.getCode()));
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/web/comResult/CommandResultDeal.java
@@ -3,6 +3,7 @@
import com.dy.common.contant.Constant;
import com.dy.common.mw.protocol.Command;
import com.dy.common.mw.protocol.Data;
import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
import com.dy.rtuMw.server.ServerProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -33,6 +34,10 @@
        this.restTemplate = restTemplate ;
    }
    /**
     * RTU设备数据
     * @param data
     */
    public void deal(Data data) {
        if (data.rtuResultSendWebUrl != null
                && !data.rtuResultSendWebUrl.trim().equals("")
@@ -58,4 +63,34 @@
            log.error("严重错误,在com.dy.aceMw.web.comResult.CommandResultDeal里,处理的是RTU命令结果Node,但数据中rtuResultSendWebUrl为空");
        }
    }
    /**
     * Mqtt消息数据
     * @param subMsg
     */
    public void deal(MqttSubMsg subMsg) {
        if (subMsg.mqttResultSendWebUrl != null
                && !subMsg.mqttResultSendWebUrl.trim().equals("")
                && !subMsg.mqttResultSendWebUrl.trim().equals(Command.ignoreRtuResultSendWebUrl)) {
            String url = UriComponentsBuilder.fromUriString(subMsg.mqttResultSendWebUrl)
                    .build()
                    .toUriString();
            restTemplate.getMessageConverters().set(1,new StringHttpMessageConverter(StandardCharsets.UTF_8));
            HttpHeaders headers = new HttpHeaders();
            headers.setContentType(MediaType.parseMediaType("application/json;charset=UTF-8"));
            headers.set(Constant.UserTokenKeyInHeader, ServerProperties.orgTag);
            HttpEntity<?> httpEntity = new HttpEntity<>(subMsg, headers);
            ResponseEntity<WebResponseVo> response = null;
            try {
                // é€šè¿‡Post方式调用接口
                response = restTemplate.exchange(url, HttpMethod.POST, httpEntity, WebResponseVo.class);
            } catch (Exception e) {
                log.error("命令结果回调发生异常", e);
                e.printStackTrace();
            }
            //assert response != null;
        } else {
            log.error("严重错误,在com.dy.aceMw.web.comResult.CommandResultDeal里,处理的是RTU命令结果Node,但数据中rtuResultSendWebUrl为空");
        }
    }
}
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/RtuDataDealTree.xml
@@ -92,5 +92,9 @@
                </task>
            </task>
        </task>
        <!-- Mqtt消息中间件订阅的消息 -->
        <task id="TkMqttData" name="接收Mqtt消息" enable="true" class="com.dy.rtuMw.server.rtuData.TkMqttData">
            <task id="TkFindPSdV1" name="识别山东V1数据" enable="true" class="com.dy.rtuMw.server.rtuData.pSdV1.TkFindPSdV1">
        </task>
    </task>
</project>
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.properties
@@ -29,3 +29,19 @@
#RTU上行数据最小间隔,大于这个间隔认为设备离线了,测控一体阀是3,表阀一体机是6,默认采用时间最长的6
base.upData.min.interval=6
# MQTT服务配置
# 233服务器:
#   å…ƒè°‹ï¼š mqtt.enable=false
#   æ²™ç›˜ï¼š mqtt.enable=false
#   æµ‹è¯•: mqtt.enable=false
#   æ¢…江: mqtt.enable=false
# 121服务器:
#   æ°‘勤: mqtt.enable=true
#   å»¶åº†ï¼š mqtt.enable=false
#   é»‘龙江: mqtt.enable=false
#   ç”˜å·žï¼š mqtt.enable=false
#   å‡‰å·žï¼š mqtt.enable=false
#   é‡‘川: mqtt.enable=true
mqtt.enable=true
mqtt.topicAndQos=ym/sd1/10000/control/m1,1;ym/sd1/10000/control/m2,1;ym/sd1/control/m4,1;ym/sd1/10000/control/m80,1
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/resources/config.xml
@@ -164,4 +164,22 @@
         idle="10"
    />
    <!--
    topicAndQos: ä¸»é¢˜ä¸ŽQos,主题名与其Qos用逗号隔开,多个主题及Qos用分号隔开,例如:ym/topic1,1;ym/topic2,1;ym/topic3,1,如果有多个OrgTag,主题前缀用其OrgTag
    publishQos: å‘布消息的Qos,取值范围:
        0    è‡³å¤šä¸€æ¬¡ï¼ˆAt most once)    æ¶ˆæ¯å‘送后不保证到达,可能丢失或重复,开销最小,可靠性最低。
        1    è‡³å°‘一次(At least once)    æ¶ˆæ¯è‡³å°‘会到达一次,可能重复,但不会丢失,可靠性中等,适用于多数场景。
        2    æ°å¥½ä¸€æ¬¡ï¼ˆExactly once)    æ¶ˆæ¯ä»…会到达一次,不重复且不丢失,可靠性最高,但开销最大,实现最复杂。
     -->
    <mqtt enable="${mqtt.enable}"
          svIp="121.199.41.121"
          svPort="1883"
          svUserName="dyyjy"
          svUserPassword="Dyyjy2025,;.abc!@#"
          poolMaxSize="10"
          topicAndQos="${mqtt.topicAndQos}"
          publishQos="1"
    />
</config>