| package com.dy.common.mw.channel.mqtt; | 
|   | 
| import org.apache.commons.pool2.BasePooledObjectFactory; | 
| import org.apache.commons.pool2.PooledObject; | 
| import org.apache.commons.pool2.impl.DefaultPooledObject; | 
| import org.eclipse.paho.client.mqttv3.MqttClient; | 
| import org.eclipse.paho.client.mqttv3.MqttConnectOptions; | 
| import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; | 
|   | 
| /** | 
|  * @Author: liurunyu | 
|  * @Date: 2025/6/4 11:16 | 
|  * @Description | 
|  */ | 
| public class MqttClientPooledObjectFactory extends BasePooledObjectFactory<MqttClient> { | 
|   | 
|     private final String broker; | 
|     private final String username; | 
|     private final String password; | 
|     private final Boolean useMemoryPersistence; | 
|   | 
|     public MqttClientPooledObjectFactory(String broker, String username, String password, boolean useMemoryPersistence) { | 
|         this.broker = broker; | 
|         this.username = username; | 
|         this.password = password; | 
|         this.useMemoryPersistence = useMemoryPersistence; | 
|     } | 
|   | 
|     @Override | 
|     public MqttClient create() throws Exception { | 
|         String clientId = MqttClient.generateClientId(); | 
|         MqttClient client = null ; | 
|         // 使用内存持久化而非默认的文件持久化 | 
|         if (useMemoryPersistence) { | 
|             MemoryPersistence persistence = new MemoryPersistence(); | 
|             client = new MqttClient(broker, clientId, persistence); | 
|         }else{ | 
|             client = new MqttClient(broker, clientId); | 
|         } | 
|         MqttConnectOptions options = new MqttConnectOptions(); | 
|         options.setUserName(username); | 
|         options.setPassword(password.toCharArray()); | 
|         options.setAutomaticReconnect(true); | 
|         options.setCleanSession(true); | 
|   | 
|         client.connect(options); | 
|         return client; | 
|     } | 
|   | 
|     @Override | 
|     public PooledObject<MqttClient> wrap(MqttClient client) { | 
|         return new DefaultPooledObject<>(client); | 
|     } | 
|   | 
|     @Override | 
|     public void destroyObject(PooledObject<MqttClient> p) throws Exception { | 
|         MqttClient client = p.getObject(); | 
|         if (client.isConnected()) { | 
|             client.disconnect(); | 
|         } | 
|         client.close(); | 
|     } | 
|   | 
|     @Override | 
|     public boolean validateObject(PooledObject<MqttClient> p) { | 
|         MqttClient client = p.getObject(); | 
|         return client.isConnected(); | 
|     } | 
| } |