package com.dy.rtuMw.server.mqtt;
|
|
import com.dy.common.mw.channel.mqtt.MqttClientPool;
|
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.Logger;
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
|
/**
|
* @Author: liurunyu
|
* @Date: 2025/6/4 14:54
|
* @Description
|
*/
|
public class MqttManager {
|
|
private static final Logger log = LogManager.getLogger(MqttManager.class.getName());
|
|
private static final MqttManager INSTANCE = new MqttManager();
|
|
private MqttUnitConfigVo configVo ;
|
|
private MqttClientPool pool;
|
|
private MqttManager(){
|
}
|
|
public static MqttManager getInstance() {
|
return MqttManager.INSTANCE;
|
}
|
/**
|
* 初始化配置信息
|
*/
|
public void initOption(MqttUnitConfigVo configVo) {
|
this.configVo = configVo;
|
}
|
|
public void start()throws Exception{
|
String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort;
|
this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize);
|
if(this.pool.isClose()){
|
throw new Exception("Mqtt连接池初始化失败");
|
}
|
MqttClient clientSub = null ;
|
try {
|
clientSub = pool.popClient();//新创建一个Client时,此Client实际去连接MQTT服务器,如果连接不上,就会抛出异常
|
}catch (Exception e){
|
throw new Exception("Mqtt连接池获得连接异常", e);
|
}
|
if(clientSub == null || !clientSub.isConnected()){
|
throw new Exception("Mqtt连接池获得订阅连接不可用");
|
}
|
for(int i = 0; i < this.configVo.subTopics.length; i++){
|
clientSub.subscribe(this.configVo.subTopics[i], this.configVo.topicsQos[i], new MqttMessageListener());
|
}
|
}
|
|
public void stop()throws Exception{
|
if(this.pool != null){
|
// 关闭连接池
|
this.pool.close();
|
}
|
}
|
|
public MqttClient popMqttClient() throws Exception{
|
return this.pool.popClient();
|
}
|
|
public void pushMqttClient(MqttClient client) {
|
this.pool.pushClient(client);
|
}
|
|
public void publishMsg(MqttClient client, String topic, byte[] msg) throws Exception{
|
client.publish(topic, msg, this.configVo.publishQos, false);
|
}
|
|
public void publishMsg(MqttClient client, String topic, String msg) throws Exception{
|
byte[] bs = msg.getBytes("UTF-8") ;
|
client.publish(topic, bs, this.configVo.publishQos, false);
|
}
|
|
public boolean poolIsClose(){
|
if(this.pool == null){
|
return true;
|
}
|
return this.pool.isClose();
|
}
|
}
|