package com.dy.rtuMw.server.mqtt; 
 | 
  
 | 
import com.dy.common.mw.channel.mqtt.MqttClientPool; 
 | 
import com.dy.common.mw.protocol4Mqtt.MqttNotify; 
 | 
import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo; 
 | 
import com.dy.common.mw.protocol4Mqtt.status.DevOnLineInfo; 
 | 
import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo; 
 | 
import com.dy.rtuMw.server.ServerProperties; 
 | 
import org.apache.logging.log4j.LogManager; 
 | 
import org.apache.logging.log4j.Logger; 
 | 
import org.eclipse.paho.client.mqttv3.MqttClient; 
 | 
  
 | 
import java.util.ArrayList; 
 | 
import java.util.List; 
 | 
  
 | 
/** 
 | 
 * @Author: liurunyu 
 | 
 * @Date: 2025/6/4 14:54 
 | 
 * @Description 
 | 
 */ 
 | 
public class MqttManager { 
 | 
  
 | 
    private static final Logger log = LogManager.getLogger(MqttManager.class.getName()); 
 | 
  
 | 
    private static final MqttManager INSTANCE = new MqttManager(); 
 | 
  
 | 
    private MqttUnitConfigVo configVo ; 
 | 
  
 | 
    private MqttClientPool pool; 
 | 
  
 | 
    private List<MqttClient> subClients ; 
 | 
  
 | 
    private MqttManager(){ 
 | 
    } 
 | 
  
 | 
    public static MqttManager getInstance() { 
 | 
        return MqttManager.INSTANCE; 
 | 
    } 
 | 
    /** 
 | 
     *  初始化配置信息 
 | 
     */ 
 | 
    public void initOption(MqttUnitConfigVo configVo) { 
 | 
        this.configVo = configVo; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 创建连接池 + 订阅主题 
 | 
     * @throws Exception 
 | 
     */ 
 | 
    public void start()throws Exception{ 
 | 
        subClients = new ArrayList<>(); 
 | 
        String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort; 
 | 
        this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize, this.configVo.useMemoryPersistence); 
 | 
        if(this.pool.isClose()){ 
 | 
            throw new Exception("Mqtt连接池初始化失败"); 
 | 
        } 
 | 
        MqttClient clientSub ; 
 | 
        try { 
 | 
            clientSub = pool.popClient();//新创建一个Client时,此Client实际去连接MQTT服务器,如果连接不上,就会抛出异常 
 | 
        }catch (Exception e){ 
 | 
            throw new Exception("Mqtt连接池获得连接异常", e); 
 | 
        } 
 | 
        if(clientSub == null || !clientSub.isConnected()){ 
 | 
            throw new Exception("Mqtt连接池获得订阅连接不可用"); 
 | 
        } 
 | 
        subClients.add(clientSub) ; 
 | 
  
 | 
        // 订阅主题 
 | 
        if(this.configVo.subTopics != null && this.configVo.subTopics.length > 0){ 
 | 
            if(this.configVo.protocolAndDeviceIds != null || this.configVo.protocolAndDeviceIds.length > 0){ 
 | 
                for(int i = 0; i < this.configVo.subTopics.length; i++){ 
 | 
                    for(int j = 0 ; j < this.configVo.protocolAndDeviceIds.length; j++){ 
 | 
                        clientSub.subscribe(ServerProperties.orgTag + "/" 
 | 
                                        + this.configVo.protocolAndDeviceIds[j] + "/" 
 | 
                                        + this.configVo.subTopics[i], 
 | 
                                this.configVo.subTopicsQos[i], 
 | 
                                //每一个订阅主题都有一个MqttMessageListener实例 
 | 
                                new MqttMessageListener(new MqttNotify(){ 
 | 
                                    @Override 
 | 
                                    public void notify(String devId, MqttNotifyInfo... infos) { 
 | 
                                        if(devId != null && infos != null && infos.length > 0){ 
 | 
                                            for(MqttNotifyInfo info : infos){ 
 | 
                                                if(info instanceof DevOnLineInfo){ 
 | 
                                                    DevOnLineInfo onLineSt = (DevOnLineInfo)info; 
 | 
                                                    if(onLineSt.onLine != null && onLineSt.onLine.booleanValue()){ 
 | 
                                                        DevStatusDealer.onLine(devId, ((DevOnLineInfo)info).protocol); 
 | 
                                                    }else{ 
 | 
                                                        DevStatusDealer.offLine(devId); 
 | 
                                                    } 
 | 
                                                } else if(info instanceof DevRunInfo){ 
 | 
                                                    DevStatusDealer.setStatus(devId, (DevRunInfo)info); 
 | 
                                                } 
 | 
                                            } 
 | 
                                        } 
 | 
                                    } 
 | 
                                }) 
 | 
                        ); 
 | 
                    } 
 | 
                } 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
  
 | 
    public void stop()throws Exception{ 
 | 
        if(subClients != null && subClients.size() > 0){ 
 | 
            for (MqttClient client : subClients) { 
 | 
                if(client != null && client.isConnected()){ 
 | 
                    try{ 
 | 
                        client.disconnect(); 
 | 
                        client.close(); 
 | 
                    }catch (Exception e){ 
 | 
                        e.printStackTrace(); 
 | 
                    } 
 | 
                } 
 | 
            } 
 | 
        } 
 | 
        if(this.pool != null){ 
 | 
            // 关闭连接池 
 | 
            this.pool.close(); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    public MqttClient popMqttClient() throws Exception{ 
 | 
        return this.pool.popClient(); 
 | 
    } 
 | 
  
 | 
    public void pushMqttClient(MqttClient client) { 
 | 
        this.pool.pushClient(client); 
 | 
    } 
 | 
  
 | 
    public void publishMsg(MqttClient client, String topic, byte[] msg) throws Exception{ 
 | 
        client.publish(topic, msg, this.configVo.pubTopicQos, false); 
 | 
    } 
 | 
  
 | 
    public void publishMsg(MqttClient client, String topic, String msg) throws Exception{ 
 | 
        byte[] bs = msg.getBytes("UTF-8") ; 
 | 
        client.publish(topic, bs, this.configVo.pubTopicQos, false); 
 | 
    } 
 | 
  
 | 
    public boolean poolIsClose(){ 
 | 
        if(this.pool == null){ 
 | 
            return true; 
 | 
        } 
 | 
        return this.pool.isClose(); 
 | 
    } 
 | 
} 
 |