pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttManager.java
@@ -10,6 +10,9 @@
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttClient;
import java.util.ArrayList;
import java.util.List;
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 14:54
@@ -24,6 +27,8 @@
    private MqttUnitConfigVo configVo ;
    private MqttClientPool pool;
    private List<MqttClient> subClients ;
    private MqttManager(){
    }
@@ -43,8 +48,9 @@
     * @throws Exception
     */
    public void start()throws Exception{
        subClients = new ArrayList<>();
        String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort;
        this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize);
        this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize, this.configVo.useMemoryPersistence);
        if(this.pool.isClose()){
            throw new Exception("Mqtt连接池初始化失败");
        }
@@ -57,6 +63,7 @@
        if(clientSub == null || !clientSub.isConnected()){
            throw new Exception("Mqtt连接池获得订阅连接不可用");
        }
        subClients.add(clientSub) ;
        // 订阅主题
        for(int i = 0; i < this.configVo.subTopics.length; i++){
            for(int j = 0 ; j < this.configVo.protocolAndDeviceIds.length; j++){
@@ -90,6 +97,18 @@
    }
    public void stop()throws Exception{
        if(subClients != null && subClients.size() > 0){
            for (MqttClient client : subClients) {
                if(client != null && client.isConnected()){
                    try{
                        client.disconnect();
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }
        if(this.pool != null){
            // 关闭连接池
            this.pool.close();