zhubaomin
2024-11-26 08fc154ca750b446e97b4b7764f0ffe130348f24
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.dy.pipIrrApp.workOrder.mqtt;
 
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
/**
 * @author ZhuBaoMin
 * @date 2024-11-16 11:27
 * @LastEditTime 2024-11-16 11:27
 * @Description MQTT客户端连接池,对外提供一个初始化的MQTT客户端
 */
 
@Slf4j
public class MqttClientConnectorPool {
    public static MqttClient mqttClient;
 
    /**
     * 连接MQTT客户端
     * @return 获取MQTT连队对象
     */
    public static MqttClient connectMQTT(String broker, String username, String password) {
        if (mqttClient != null){
            log.info("已存在!");
            return mqttClient;
        }
 
        try {
            String clientId = System.currentTimeMillis() + "";
 
            //创建MQTT客户端(指定broker、客户端id、消息持久策略)
            mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
 
            //创建连接参数配置
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            //是否清除会话
            options.setCleanSession(true);
            //连接超时时间
            options.setKeepAliveInterval(20);
            //是否自动重连
            options.setAutomaticReconnect(true);
            mqttClient.connect(options);
            log.info("MqttClient 服务启动broker初始化!");
        } catch (MqttException e){
            log.error("MqttClient connect Error:{}", e.getMessage());
            e.printStackTrace();
        }
        return mqttClient;
    }
 
    /**
     * 关闭MQTT客户端
     * @param client client
     */
    public static void closeClient(MqttClient client){
        try {
            // 断开连接
            client.disconnect();
            // 关闭客户端
            client.close();
        } catch (MqttException e){
            log.error("MqttClient disconnect or close Error:{}", e.getMessage());
            e.printStackTrace();
        }
    }
 
    /**
     * 关闭MQTT客户端
     */
    public static void closeStaticClient(){
        try {
            if (mqttClient != null){
                // 断开连接
                mqttClient.disconnect();
                // 关闭客户端
                mqttClient.close();
            }
        } catch (MqttException e){
            log.error("MqttClient disconnect or close Error:{}", e.getMessage());
            e.printStackTrace();
        }
    }
}