| package com.dy.common.mw.channel.mqtt; | 
|   | 
| import com.dy.common.util.Callback; | 
| import com.dy.common.util.ThreadJob; | 
| import org.eclipse.paho.client.mqttv3.MqttClient; | 
|   | 
| /** | 
|  * @Author: liurunyu | 
|  * @Date: 2025/6/4 11:35 | 
|  * @Description | 
|  */ | 
| public class Test { | 
|   | 
|     private static String mqSvIp = "121.199.41.121"; | 
|     private static Integer mqSvPort = 1883; | 
|     private static String mqSvUserName = "dyyjy"; | 
|     private static String mqSvUserPassword = "Dyyjy2025,;.abc!@#"; | 
|   | 
|     private static String topic1 = "test/topic1" ; | 
|     private static String topic2 = "test/topic2" ; | 
|   | 
|     private static int maxConnections = 10 ; | 
|   | 
|     private static MqttClientPool pool; | 
|     /** | 
|      * QoS | 
|      * 等级    名称                        消息传递特性 | 
|      * 0    至多一次(At most once)    消息发送后不保证到达,可能丢失或重复,开销最小,可靠性最低。 | 
|      * 1    至少一次(At least once)    消息至少会到达一次,可能重复,但不会丢失,可靠性中等,适用于多数场景。 | 
|      * 2    恰好一次(Exactly once)    消息仅会到达一次,不重复且不丢失,可靠性最高,但开销最大,实现最复杂。 | 
|      */ | 
|     private static int QoS = 1 ; | 
|   | 
|     public static void main(String[] args) { | 
|         try{ | 
|             // 初始化连接池 | 
|             pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections, true); | 
|             MqttClient clientSub = pool.popClient() ; | 
|             testSubscribe(clientSub, topic1); | 
|             testSubscribe(clientSub, topic2); | 
|             MqttClient clientPub = pool.popClient() ; | 
|             testPublish(clientPub, topic1, "hello world", 1000); | 
|             testPublish(clientPub, topic2, "hello China", 1500); | 
|         } catch (Exception e) { | 
|             e.printStackTrace(); | 
|         } finally { | 
|             // 关闭连接池 | 
|             //pool.close(); | 
|         } | 
|     } | 
|     private static void testSubscribe(MqttClient clientSub, String topic) throws Exception { | 
|         ThreadJob tjob = new ThreadJob() { | 
|             @Override | 
|             public Object execute() throws Exception { | 
|                 clientSub.subscribe(topic, QoS, (topic, msg) -> { | 
|                     System.out.println("从主题" + topic + "收到消息: " + new String(msg.getPayload())); | 
|                 }); | 
|                 while (true) { | 
|                     Thread.sleep(1000L); | 
|                 } | 
|             } | 
|         }; | 
|         tjob.start(new Callback() { | 
|             @Override | 
|             public void call(Object obj) { | 
|                 System.out.println("执行成功"); | 
|             } | 
|             @Override | 
|             public void call(Object... objs) { | 
|                 System.out.println("执行成功"); | 
|             } | 
|   | 
|             @Override | 
|             public void exception(Exception e) { | 
|                 e.printStackTrace(); | 
|             } | 
|         }); | 
|     } | 
|   | 
|     private static void testPublish(MqttClient clientPub, String topic, String message, long sleep) throws Exception { | 
|         ThreadJob tjob = new ThreadJob() { | 
|             @Override | 
|             public Object execute() throws Exception { | 
|                 int count = 0 ; | 
|                 while (true) { | 
|                     // 发布消息 | 
|                     byte[] bs = (message + "  " + count++).getBytes() ; | 
|                     clientPub.publish(topic, bs, QoS, false); | 
|                     Thread.sleep(sleep); | 
|                 } | 
|             } | 
|         }; | 
|         tjob.start(new Callback() { | 
|             @Override | 
|             public void call(Object obj) { | 
|                 System.out.println("执行成功"); | 
|             } | 
|             @Override | 
|             public void call(Object... objs) { | 
|                 System.out.println("执行成功"); | 
|             } | 
|   | 
|             @Override | 
|             public void exception(Exception e) { | 
|                 e.printStackTrace(); | 
|             } | 
|         }); | 
|     } | 
| } |