package com.dy.rtuMw.server.mqtt; 
 | 
  
 | 
import com.dy.common.mw.channel.mqtt.MqttClientPool; 
 | 
import org.apache.logging.log4j.LogManager; 
 | 
import org.apache.logging.log4j.Logger; 
 | 
import org.eclipse.paho.client.mqttv3.MqttClient; 
 | 
  
 | 
/** 
 | 
 * @Author: liurunyu 
 | 
 * @Date: 2025/6/4 14:54 
 | 
 * @Description 
 | 
 */ 
 | 
public class MqttManager { 
 | 
  
 | 
    private static final Logger log = LogManager.getLogger(MqttManager.class.getName()); 
 | 
  
 | 
    private static final MqttManager INSTANCE = new MqttManager(); 
 | 
  
 | 
    private MqttUnitConfigVo configVo ; 
 | 
  
 | 
    private MqttClientPool pool; 
 | 
  
 | 
    private MqttManager(){ 
 | 
    } 
 | 
  
 | 
    public static MqttManager getInstance() { 
 | 
        return MqttManager.INSTANCE; 
 | 
    } 
 | 
    /** 
 | 
     *  初始化配置信息 
 | 
     */ 
 | 
    public void initOption(MqttUnitConfigVo configVo) { 
 | 
        this.configVo = configVo; 
 | 
    } 
 | 
  
 | 
    public void start()throws Exception{ 
 | 
        String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort; 
 | 
        this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize); 
 | 
        if(this.pool.isClose()){ 
 | 
            throw new Exception("Mqtt连接池初始化失败"); 
 | 
        } 
 | 
        MqttClient clientSub = null ; 
 | 
        try { 
 | 
            clientSub = pool.popClient();//新创建一个Client时,此Client实际去连接MQTT服务器,如果连接不上,就会抛出异常 
 | 
        }catch (Exception e){ 
 | 
            throw new Exception("Mqtt连接池获得连接异常", e); 
 | 
        } 
 | 
        if(clientSub == null || !clientSub.isConnected()){ 
 | 
            throw new Exception("Mqtt连接池获得订阅连接不可用"); 
 | 
        } 
 | 
        for(int i = 0; i < this.configVo.subTopics.length; i++){ 
 | 
            clientSub.subscribe(this.configVo.subTopics[i], this.configVo.topicsQos[i], new MqttMessageListener()); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    public void stop()throws Exception{ 
 | 
        if(this.pool != null){ 
 | 
            // 关闭连接池 
 | 
            this.pool.close(); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    public MqttClient popMqttClient() throws Exception{ 
 | 
        return this.pool.popClient(); 
 | 
    } 
 | 
  
 | 
    public void pushMqttClient(MqttClient client) { 
 | 
        this.pool.pushClient(client); 
 | 
    } 
 | 
  
 | 
    public void publishMsg(MqttClient client, String topic, byte[] msg) throws Exception{ 
 | 
        client.publish(topic, msg, this.configVo.publishQos, false); 
 | 
    } 
 | 
  
 | 
    public void publishMsg(MqttClient client, String topic, String msg) throws Exception{ 
 | 
        byte[] bs = msg.getBytes("UTF-8") ; 
 | 
        client.publish(topic, bs, this.configVo.publishQos, false); 
 | 
    } 
 | 
  
 | 
    public boolean poolIsClose(){ 
 | 
        if(this.pool == null){ 
 | 
            return true; 
 | 
        } 
 | 
        return this.pool.isClose(); 
 | 
    } 
 | 
} 
 |