package com.dy.pipIrrApp.workOrder.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * @author ZhuBaoMin * @date 2024-11-16 11:27 * @LastEditTime 2024-11-16 11:27 * @Description MQTT客户端连接池,对外提供一个初始化的MQTT客户端 */ @Slf4j public class MqttClientConnectorPool { public static MqttClient mqttClient; /** * 连接MQTT客户端 * @return 获取MQTT连队对象 */ public static MqttClient connectMQTT(String broker, String username, String password) { if (mqttClient != null){ log.info("已存在!"); return mqttClient; } try { String clientId = System.currentTimeMillis() + ""; //创建MQTT客户端(指定broker、客户端id、消息持久策略) mqttClient = new MqttClient(broker, clientId, new MemoryPersistence()); //创建连接参数配置 MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); //是否清除会话 options.setCleanSession(true); //连接超时时间 options.setKeepAliveInterval(20); //是否自动重连 options.setAutomaticReconnect(true); mqttClient.connect(options); log.info("MqttClient 服务启动broker初始化!"); } catch (MqttException e){ log.error("MqttClient connect Error:{}", e.getMessage()); e.printStackTrace(); } return mqttClient; } /** * 关闭MQTT客户端 * @param client client */ public static void closeClient(MqttClient client){ try { // 断开连接 client.disconnect(); // 关闭客户端 client.close(); } catch (MqttException e){ log.error("MqttClient disconnect or close Error:{}", e.getMessage()); e.printStackTrace(); } } /** * 关闭MQTT客户端 */ public static void closeStaticClient(){ try { if (mqttClient != null){ // 断开连接 mqttClient.disconnect(); // 关闭客户端 mqttClient.close(); } } catch (MqttException e){ log.error("MqttClient disconnect or close Error:{}", e.getMessage()); e.printStackTrace(); } } }