liurunyu
2 天以前 b0ef25abf150d88b2a4888e4bf0d2ae858ad3751
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.dy.common.mw.channel.mqtt;
 
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 11:16
 * @Description
 */
public class MqttClientPooledObjectFactory extends BasePooledObjectFactory<MqttClient> {
 
    private final String broker;
    private final String username;
    private final String password;
 
    public MqttClientPooledObjectFactory(String broker, String username, String password) {
        this.broker = broker;
        this.username = username;
        this.password = password;
    }
 
    @Override
    public MqttClient create() throws Exception {
        String clientId = MqttClient.generateClientId();
        MqttClient client = new MqttClient(broker, clientId);
 
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setAutomaticReconnect(true);
        options.setCleanSession(true);
 
        client.connect(options);
        return client;
    }
 
    @Override
    public PooledObject<MqttClient> wrap(MqttClient client) {
        return new DefaultPooledObject<>(client);
    }
 
    @Override
    public void destroyObject(PooledObject<MqttClient> p) throws Exception {
        MqttClient client = p.getObject();
        if (client.isConnected()) {
            client.disconnect();
        }
        client.close();
    }
 
    @Override
    public boolean validateObject(PooledObject<MqttClient> p) {
        MqttClient client = p.getObject();
        return client.isConnected();
    }
}