| New file | 
|  |  |  | 
|---|
|  |  |  | package com.dy.pipIrrApp.workOrder.mqtt; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import lombok.extern.slf4j.Slf4j; | 
|---|
|  |  |  | import org.eclipse.paho.client.mqttv3.MqttClient; | 
|---|
|  |  |  | import org.eclipse.paho.client.mqttv3.MqttConnectOptions; | 
|---|
|  |  |  | import org.eclipse.paho.client.mqttv3.MqttException; | 
|---|
|  |  |  | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * @author ZhuBaoMin | 
|---|
|  |  |  | * @date 2024-11-16 11:27 | 
|---|
|  |  |  | * @LastEditTime 2024-11-16 11:27 | 
|---|
|  |  |  | * @Description MQTT客户端连接池,对外提供一个初始化的MQTT客户端 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Slf4j | 
|---|
|  |  |  | public class MqttClientConnectorPool { | 
|---|
|  |  |  | public static MqttClient mqttClient; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 连接MQTT客户端 | 
|---|
|  |  |  | * @return 获取MQTT连队对象 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static MqttClient connectMQTT(String broker, String username, String password) { | 
|---|
|  |  |  | if (mqttClient != null){ | 
|---|
|  |  |  | log.info("已存在!"); | 
|---|
|  |  |  | return mqttClient; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | String clientId = System.currentTimeMillis() + ""; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //创建MQTT客户端(指定broker、客户端id、消息持久策略) | 
|---|
|  |  |  | mqttClient = new MqttClient(broker, clientId, new MemoryPersistence()); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //创建连接参数配置 | 
|---|
|  |  |  | MqttConnectOptions options = new MqttConnectOptions(); | 
|---|
|  |  |  | options.setUserName(username); | 
|---|
|  |  |  | options.setPassword(password.toCharArray()); | 
|---|
|  |  |  | //是否清除会话 | 
|---|
|  |  |  | options.setCleanSession(true); | 
|---|
|  |  |  | //连接超时时间 | 
|---|
|  |  |  | options.setKeepAliveInterval(20); | 
|---|
|  |  |  | //是否自动重连 | 
|---|
|  |  |  | options.setAutomaticReconnect(true); | 
|---|
|  |  |  | mqttClient.connect(options); | 
|---|
|  |  |  | log.info("MqttClient 服务启动broker初始化!"); | 
|---|
|  |  |  | } catch (MqttException e){ | 
|---|
|  |  |  | log.error("MqttClient connect Error:{}", e.getMessage()); | 
|---|
|  |  |  | e.printStackTrace(); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | return mqttClient; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 关闭MQTT客户端 | 
|---|
|  |  |  | * @param client client | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static void closeClient(MqttClient client){ | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | // 断开连接 | 
|---|
|  |  |  | client.disconnect(); | 
|---|
|  |  |  | // 关闭客户端 | 
|---|
|  |  |  | client.close(); | 
|---|
|  |  |  | } catch (MqttException e){ | 
|---|
|  |  |  | log.error("MqttClient disconnect or close Error:{}", e.getMessage()); | 
|---|
|  |  |  | e.printStackTrace(); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 关闭MQTT客户端 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public static void closeStaticClient(){ | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | if (mqttClient != null){ | 
|---|
|  |  |  | // 断开连接 | 
|---|
|  |  |  | mqttClient.disconnect(); | 
|---|
|  |  |  | // 关闭客户端 | 
|---|
|  |  |  | mqttClient.close(); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } catch (MqttException e){ | 
|---|
|  |  |  | log.error("MqttClient disconnect or close Error:{}", e.getMessage()); | 
|---|
|  |  |  | e.printStackTrace(); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|