| package com.dy.rtuMw.server.mqtt; | 
|   | 
| import com.dy.common.mw.channel.mqtt.MqttClientPool; | 
| import com.dy.common.mw.protocol4Mqtt.MqttNotify; | 
| import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo; | 
| import com.dy.common.mw.protocol4Mqtt.status.DevOnLineInfo; | 
| import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo; | 
| import com.dy.rtuMw.server.ServerProperties; | 
| import org.apache.logging.log4j.LogManager; | 
| import org.apache.logging.log4j.Logger; | 
| import org.eclipse.paho.client.mqttv3.MqttClient; | 
|   | 
| import java.util.ArrayList; | 
| import java.util.List; | 
|   | 
| /** | 
|  * @Author: liurunyu | 
|  * @Date: 2025/6/4 14:54 | 
|  * @Description | 
|  */ | 
| public class MqttManager { | 
|   | 
|     private static final Logger log = LogManager.getLogger(MqttManager.class.getName()); | 
|   | 
|     private static final MqttManager INSTANCE = new MqttManager(); | 
|   | 
|     private MqttUnitConfigVo configVo ; | 
|   | 
|     private MqttClientPool pool; | 
|   | 
|     private List<MqttClient> subClients ; | 
|   | 
|     private MqttManager(){ | 
|     } | 
|   | 
|     public static MqttManager getInstance() { | 
|         return MqttManager.INSTANCE; | 
|     } | 
|     /** | 
|      *  初始化配置信息 | 
|      */ | 
|     public void initOption(MqttUnitConfigVo configVo) { | 
|         this.configVo = configVo; | 
|     } | 
|   | 
|     /** | 
|      * 创建连接池 + 订阅主题 | 
|      * @throws Exception | 
|      */ | 
|     public void start()throws Exception{ | 
|         subClients = new ArrayList<>(); | 
|         String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort; | 
|         this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize, this.configVo.useMemoryPersistence); | 
|         if(this.pool.isClose()){ | 
|             throw new Exception("Mqtt连接池初始化失败"); | 
|         } | 
|         MqttClient clientSub ; | 
|         try { | 
|             clientSub = pool.popClient();//新创建一个Client时,此Client实际去连接MQTT服务器,如果连接不上,就会抛出异常 | 
|         }catch (Exception e){ | 
|             throw new Exception("Mqtt连接池获得连接异常", e); | 
|         } | 
|         if(clientSub == null || !clientSub.isConnected()){ | 
|             throw new Exception("Mqtt连接池获得订阅连接不可用"); | 
|         } | 
|         subClients.add(clientSub) ; | 
|         // 订阅主题 | 
|         for(int i = 0; i < this.configVo.subTopics.length; i++){ | 
|             for(int j = 0 ; j < this.configVo.protocolAndDeviceIds.length; j++){ | 
|                 clientSub.subscribe(ServerProperties.orgTag + "/" | 
|                         + this.configVo.protocolAndDeviceIds[j] + "/" | 
|                         + this.configVo.subTopics[i], | 
|                         this.configVo.subTopicsQos[i], | 
|                         //每一个订阅主题都有一个MqttMessageListener实例 | 
|                         new MqttMessageListener(new MqttNotify(){ | 
|                             @Override | 
|                             public void notify(String devId, MqttNotifyInfo... infos) { | 
|                                 if(devId != null && infos != null && infos.length > 0){ | 
|                                     for(MqttNotifyInfo info : infos){ | 
|                                         if(info instanceof DevOnLineInfo){ | 
|                                             DevOnLineInfo onLineSt = (DevOnLineInfo)info; | 
|                                             if(onLineSt.onLine != null && onLineSt.onLine.booleanValue()){ | 
|                                                 DevStatusDealer.onLine(devId, ((DevOnLineInfo)info).protocol); | 
|                                             }else{ | 
|                                                 DevStatusDealer.offLine(devId); | 
|                                             } | 
|                                         } else if(info instanceof DevRunInfo){ | 
|                                             DevStatusDealer.setStatus(devId, (DevRunInfo)info); | 
|                                         } | 
|                                     } | 
|                                 } | 
|                             } | 
|                         }) | 
|                 ); | 
|             } | 
|         } | 
|     } | 
|   | 
|     public void stop()throws Exception{ | 
|         if(subClients != null && subClients.size() > 0){ | 
|             for (MqttClient client : subClients) { | 
|                 if(client != null && client.isConnected()){ | 
|                     try{ | 
|                         client.disconnect(); | 
|                         client.close(); | 
|                     }catch (Exception e){ | 
|                         e.printStackTrace(); | 
|                     } | 
|                 } | 
|             } | 
|         } | 
|         if(this.pool != null){ | 
|             // 关闭连接池 | 
|             this.pool.close(); | 
|         } | 
|     } | 
|   | 
|     public MqttClient popMqttClient() throws Exception{ | 
|         return this.pool.popClient(); | 
|     } | 
|   | 
|     public void pushMqttClient(MqttClient client) { | 
|         this.pool.pushClient(client); | 
|     } | 
|   | 
|     public void publishMsg(MqttClient client, String topic, byte[] msg) throws Exception{ | 
|         client.publish(topic, msg, this.configVo.pubTopicQos, false); | 
|     } | 
|   | 
|     public void publishMsg(MqttClient client, String topic, String msg) throws Exception{ | 
|         byte[] bs = msg.getBytes("UTF-8") ; | 
|         client.publish(topic, bs, this.configVo.pubTopicQos, false); | 
|     } | 
|   | 
|     public boolean poolIsClose(){ | 
|         if(this.pool == null){ | 
|             return true; | 
|         } | 
|         return this.pool.isClose(); | 
|     } | 
| } |