package com.dy.common.mw.channel.mqtt; 
 | 
  
 | 
import org.apache.commons.pool2.BasePooledObjectFactory; 
 | 
import org.apache.commons.pool2.PooledObject; 
 | 
import org.apache.commons.pool2.impl.DefaultPooledObject; 
 | 
import org.eclipse.paho.client.mqttv3.MqttClient; 
 | 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 
 | 
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 
 | 
  
 | 
/** 
 | 
 * @Author: liurunyu 
 | 
 * @Date: 2025/6/4 11:16 
 | 
 * @Description 
 | 
 */ 
 | 
public class MqttClientPooledObjectFactory extends BasePooledObjectFactory<MqttClient> { 
 | 
  
 | 
    private final String broker; 
 | 
    private final String username; 
 | 
    private final String password; 
 | 
    private final Boolean useMemoryPersistence; 
 | 
  
 | 
    public MqttClientPooledObjectFactory(String broker, String username, String password, boolean useMemoryPersistence) { 
 | 
        this.broker = broker; 
 | 
        this.username = username; 
 | 
        this.password = password; 
 | 
        this.useMemoryPersistence = useMemoryPersistence; 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public MqttClient create() throws Exception { 
 | 
        String clientId = MqttClient.generateClientId(); 
 | 
        MqttClient client = null ; 
 | 
        // 使用内存持久化而非默认的文件持久化 
 | 
        if (useMemoryPersistence) { 
 | 
            MemoryPersistence persistence = new MemoryPersistence(); 
 | 
            client = new MqttClient(broker, clientId, persistence); 
 | 
        }else{ 
 | 
            client = new MqttClient(broker, clientId); 
 | 
        } 
 | 
        MqttConnectOptions options = new MqttConnectOptions(); 
 | 
        options.setUserName(username); 
 | 
        options.setPassword(password.toCharArray()); 
 | 
        options.setAutomaticReconnect(true); 
 | 
        options.setCleanSession(true); 
 | 
  
 | 
        client.connect(options); 
 | 
        return client; 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public PooledObject<MqttClient> wrap(MqttClient client) { 
 | 
        return new DefaultPooledObject<>(client); 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void destroyObject(PooledObject<MqttClient> p) throws Exception { 
 | 
        MqttClient client = p.getObject(); 
 | 
        if (client.isConnected()) { 
 | 
            client.disconnect(); 
 | 
        } 
 | 
        client.close(); 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public boolean validateObject(PooledObject<MqttClient> p) { 
 | 
        MqttClient client = p.getObject(); 
 | 
        return client.isConnected(); 
 | 
    } 
 | 
} 
 |