|  |  | 
 |  |  | import com.dy.common.mw.channel.mqtt.MqttClientPool; | 
 |  |  | import com.dy.common.mw.protocol4Mqtt.MqttNotify; | 
 |  |  | import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo; | 
 |  |  | import com.dy.common.mw.protocol4Mqtt.status.DevOnLineSt; | 
 |  |  | import com.dy.common.mw.protocol4Mqtt.status.DevRunSt; | 
 |  |  | import com.dy.common.mw.protocol4Mqtt.status.DevOnLineInfo; | 
 |  |  | import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo; | 
 |  |  | import com.dy.rtuMw.server.ServerProperties; | 
 |  |  | import org.apache.logging.log4j.LogManager; | 
 |  |  | import org.apache.logging.log4j.Logger; | 
 |  |  | import org.eclipse.paho.client.mqttv3.MqttClient; | 
 |  |  |  | 
 |  |  | import java.util.ArrayList; | 
 |  |  | import java.util.List; | 
 |  |  |  | 
 |  |  | /** | 
 |  |  |  * @Author: liurunyu | 
 |  |  | 
 |  |  |     private MqttUnitConfigVo configVo ; | 
 |  |  |  | 
 |  |  |     private MqttClientPool pool; | 
 |  |  |  | 
 |  |  |     private List<MqttClient> subClients ; | 
 |  |  |  | 
 |  |  |     private MqttManager(){ | 
 |  |  |     } | 
 |  |  | 
 |  |  |      * @throws Exception | 
 |  |  |      */ | 
 |  |  |     public void start()throws Exception{ | 
 |  |  |         subClients = new ArrayList<>(); | 
 |  |  |         String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort; | 
 |  |  |         this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize); | 
 |  |  |         this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize, this.configVo.useMemoryPersistence); | 
 |  |  |         if(this.pool.isClose()){ | 
 |  |  |             throw new Exception("Mqtt连接池初始化失败"); | 
 |  |  |         } | 
 |  |  | 
 |  |  |         if(clientSub == null || !clientSub.isConnected()){ | 
 |  |  |             throw new Exception("Mqtt连接池获得订阅连接不可用"); | 
 |  |  |         } | 
 |  |  |         subClients.add(clientSub) ; | 
 |  |  |  | 
 |  |  |         // 订阅主题 | 
 |  |  |         for(int i = 0; i < this.configVo.subTopics.length; i++){ | 
 |  |  |             for(int j = 0 ; j < this.configVo.protocolAndDeviceIds.length; j++){ | 
 |  |  |                 clientSub.subscribe(ServerProperties.orgTag + "/" | 
 |  |  |                         + this.configVo.protocolAndDeviceIds[j] + "/" | 
 |  |  |                         + this.configVo.subTopics[i], | 
 |  |  |                         this.configVo.subTopicsQos[i], | 
 |  |  |                         //每一个订阅主题都有一个MqttMessageListener实例 | 
 |  |  |                         new MqttMessageListener(new MqttNotify(){ | 
 |  |  |                             @Override | 
 |  |  |                             public void notify(String devId, MqttNotifyInfo... infos) { | 
 |  |  |                                 if(devId != null && infos != null && infos.length > 0){ | 
 |  |  |                                     for(MqttNotifyInfo info : infos){ | 
 |  |  |                                         if(info instanceof DevOnLineSt){ | 
 |  |  |                                             DevOnLineSt onLineSt = (DevOnLineSt)info; | 
 |  |  |                                             if(onLineSt.onLine != null && onLineSt.onLine.booleanValue()){ | 
 |  |  |                                                 DevStatusDealer.onLine(devId, ((DevOnLineSt)info).protocol); | 
 |  |  |                                             }else{ | 
 |  |  |                                                 DevStatusDealer.offLine(devId); | 
 |  |  |         if(this.configVo.subTopics != null && this.configVo.subTopics.length > 0){ | 
 |  |  |             if(this.configVo.protocolAndDeviceIds != null || this.configVo.protocolAndDeviceIds.length > 0){ | 
 |  |  |                 for(int i = 0; i < this.configVo.subTopics.length; i++){ | 
 |  |  |                     for(int j = 0 ; j < this.configVo.protocolAndDeviceIds.length; j++){ | 
 |  |  |                         clientSub.subscribe(ServerProperties.orgTag + "/" | 
 |  |  |                                         + this.configVo.protocolAndDeviceIds[j] + "/" | 
 |  |  |                                         + this.configVo.subTopics[i], | 
 |  |  |                                 this.configVo.subTopicsQos[i], | 
 |  |  |                                 //每一个订阅主题都有一个MqttMessageListener实例 | 
 |  |  |                                 new MqttMessageListener(new MqttNotify(){ | 
 |  |  |                                     @Override | 
 |  |  |                                     public void notify(String devId, MqttNotifyInfo... infos) { | 
 |  |  |                                         if(devId != null && infos != null && infos.length > 0){ | 
 |  |  |                                             for(MqttNotifyInfo info : infos){ | 
 |  |  |                                                 if(info instanceof DevOnLineInfo){ | 
 |  |  |                                                     DevOnLineInfo onLineSt = (DevOnLineInfo)info; | 
 |  |  |                                                     if(onLineSt.onLine != null && onLineSt.onLine.booleanValue()){ | 
 |  |  |                                                         DevStatusDealer.onLine(devId, ((DevOnLineInfo)info).protocol); | 
 |  |  |                                                     }else{ | 
 |  |  |                                                         DevStatusDealer.offLine(devId); | 
 |  |  |                                                     } | 
 |  |  |                                                 } else if(info instanceof DevRunInfo){ | 
 |  |  |                                                     DevStatusDealer.setStatus(devId, (DevRunInfo)info); | 
 |  |  |                                                 } | 
 |  |  |                                             } | 
 |  |  |                                         } else if(info instanceof DevRunSt){ | 
 |  |  |                                             DevStatusDealer.setStatus(devId, (DevRunSt)info); | 
 |  |  |                                         } | 
 |  |  |                                     } | 
 |  |  |                                 } | 
 |  |  |                             } | 
 |  |  |                         }) | 
 |  |  |                 ); | 
 |  |  |                                 }) | 
 |  |  |                         ); | 
 |  |  |                     } | 
 |  |  |                 } | 
 |  |  |             } | 
 |  |  |         } | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     public void stop()throws Exception{ | 
 |  |  |         if(subClients != null && subClients.size() > 0){ | 
 |  |  |             for (MqttClient client : subClients) { | 
 |  |  |                 if(client != null && client.isConnected()){ | 
 |  |  |                     try{ | 
 |  |  |                         client.disconnect(); | 
 |  |  |                         client.close(); | 
 |  |  |                     }catch (Exception e){ | 
 |  |  |                         e.printStackTrace(); | 
 |  |  |                     } | 
 |  |  |                 } | 
 |  |  |             } | 
 |  |  |         } | 
 |  |  |         if(this.pool != null){ | 
 |  |  |             // 关闭连接池 | 
 |  |  |             this.pool.close(); |