package com.dy.common.mw.channel.mqtt; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; /** * @Author: liurunyu * @Date: 2025/6/4 11:16 * @Description */ public class MqttClientPooledObjectFactory extends BasePooledObjectFactory { private final String broker; private final String username; private final String password; public MqttClientPooledObjectFactory(String broker, String username, String password) { this.broker = broker; this.username = username; this.password = password; } @Override public MqttClient create() throws Exception { String clientId = MqttClient.generateClientId(); MqttClient client = new MqttClient(broker, clientId); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); options.setAutomaticReconnect(true); options.setCleanSession(true); client.connect(options); return client; } @Override public PooledObject wrap(MqttClient client) { return new DefaultPooledObject<>(client); } @Override public void destroyObject(PooledObject p) throws Exception { MqttClient client = p.getObject(); if (client.isConnected()) { client.disconnect(); } client.close(); } @Override public boolean validateObject(PooledObject p) { MqttClient client = p.getObject(); return client.isConnected(); } }