package com.dy.rtuMw.server.mqtt;
|
|
import com.dy.common.mw.channel.mqtt.MqttClientPool;
|
import com.dy.common.mw.protocol4Mqtt.MqttNotify;
|
import com.dy.common.mw.protocol4Mqtt.MqttNotifyInfo;
|
import com.dy.common.mw.protocol4Mqtt.status.DevOnLineInfo;
|
import com.dy.common.mw.protocol4Mqtt.status.DevRunInfo;
|
import com.dy.rtuMw.server.ServerProperties;
|
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.Logger;
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
|
/**
|
* @Author: liurunyu
|
* @Date: 2025/6/4 14:54
|
* @Description
|
*/
|
public class MqttManager {
|
|
private static final Logger log = LogManager.getLogger(MqttManager.class.getName());
|
|
private static final MqttManager INSTANCE = new MqttManager();
|
|
private MqttUnitConfigVo configVo ;
|
|
private MqttClientPool pool;
|
|
private List<MqttClient> subClients ;
|
|
private MqttManager(){
|
}
|
|
public static MqttManager getInstance() {
|
return MqttManager.INSTANCE;
|
}
|
/**
|
* 初始化配置信息
|
*/
|
public void initOption(MqttUnitConfigVo configVo) {
|
this.configVo = configVo;
|
}
|
|
/**
|
* 创建连接池 + 订阅主题
|
* @throws Exception
|
*/
|
public void start()throws Exception{
|
subClients = new ArrayList<>();
|
String URL = "tcp://" + this.configVo.svIp + ":" + this.configVo.svPort;
|
this.pool = new MqttClientPool(URL, this.configVo.svUserName, this.configVo.svUserPassword, this.configVo.poolMaxSize, this.configVo.useMemoryPersistence);
|
if(this.pool.isClose()){
|
throw new Exception("Mqtt连接池初始化失败");
|
}
|
MqttClient clientSub ;
|
try {
|
clientSub = pool.popClient();//新创建一个Client时,此Client实际去连接MQTT服务器,如果连接不上,就会抛出异常
|
}catch (Exception e){
|
throw new Exception("Mqtt连接池获得连接异常", e);
|
}
|
if(clientSub == null || !clientSub.isConnected()){
|
throw new Exception("Mqtt连接池获得订阅连接不可用");
|
}
|
subClients.add(clientSub) ;
|
// 订阅主题
|
for(int i = 0; i < this.configVo.subTopics.length; i++){
|
for(int j = 0 ; j < this.configVo.protocolAndDeviceIds.length; j++){
|
clientSub.subscribe(ServerProperties.orgTag + "/"
|
+ this.configVo.protocolAndDeviceIds[j] + "/"
|
+ this.configVo.subTopics[i],
|
this.configVo.subTopicsQos[i],
|
//每一个订阅主题都有一个MqttMessageListener实例
|
new MqttMessageListener(new MqttNotify(){
|
@Override
|
public void notify(String devId, MqttNotifyInfo... infos) {
|
if(devId != null && infos != null && infos.length > 0){
|
for(MqttNotifyInfo info : infos){
|
if(info instanceof DevOnLineInfo){
|
DevOnLineInfo onLineSt = (DevOnLineInfo)info;
|
if(onLineSt.onLine != null && onLineSt.onLine.booleanValue()){
|
DevStatusDealer.onLine(devId, ((DevOnLineInfo)info).protocol);
|
}else{
|
DevStatusDealer.offLine(devId);
|
}
|
} else if(info instanceof DevRunInfo){
|
DevStatusDealer.setStatus(devId, (DevRunInfo)info);
|
}
|
}
|
}
|
}
|
})
|
);
|
}
|
}
|
}
|
|
public void stop()throws Exception{
|
if(subClients != null && subClients.size() > 0){
|
for (MqttClient client : subClients) {
|
if(client != null && client.isConnected()){
|
try{
|
client.disconnect();
|
client.close();
|
}catch (Exception e){
|
e.printStackTrace();
|
}
|
}
|
}
|
}
|
if(this.pool != null){
|
// 关闭连接池
|
this.pool.close();
|
}
|
}
|
|
public MqttClient popMqttClient() throws Exception{
|
return this.pool.popClient();
|
}
|
|
public void pushMqttClient(MqttClient client) {
|
this.pool.pushClient(client);
|
}
|
|
public void publishMsg(MqttClient client, String topic, byte[] msg) throws Exception{
|
client.publish(topic, msg, this.configVo.pubTopicQos, false);
|
}
|
|
public void publishMsg(MqttClient client, String topic, String msg) throws Exception{
|
byte[] bs = msg.getBytes("UTF-8") ;
|
client.publish(topic, bs, this.configVo.pubTopicQos, false);
|
}
|
|
public boolean poolIsClose(){
|
if(this.pool == null){
|
return true;
|
}
|
return this.pool.isClose();
|
}
|
}
|