package com.dy.common.mw.channel.mqtt; import com.dy.common.util.Callback; import com.dy.common.util.ThreadJob; import org.eclipse.paho.client.mqttv3.MqttClient; /** * @Author: liurunyu * @Date: 2025/6/4 11:35 * @Description */ public class Test { private static String mqSvIp = "121.199.41.121"; private static Integer mqSvPort = 1883; private static String mqSvUserName = "dyyjy"; private static String mqSvUserPassword = "Dyyjy2025,;.abc!@#"; private static String topic1 = "test/topic1" ; private static String topic2 = "test/topic2" ; private static int maxConnections = 10 ; private static MqttClientPool pool; /** * QoS * 等级 名称 消息传递特性 * 0 至多一次(At most once) 消息发送后不保证到达,可能丢失或重复,开销最小,可靠性最低。 * 1 至少一次(At least once) 消息至少会到达一次,可能重复,但不会丢失,可靠性中等,适用于多数场景。 * 2 恰好一次(Exactly once) 消息仅会到达一次,不重复且不丢失,可靠性最高,但开销最大,实现最复杂。 */ private static int QoS = 1 ; public static void main(String[] args) { try{ // 初始化连接池 pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections); MqttClient clientSub = pool.popClient() ; testSubscribe(clientSub, topic1); testSubscribe(clientSub, topic2); MqttClient clientPub = pool.popClient() ; testPublish(clientPub, topic1, "hello world", 1000); testPublish(clientPub, topic2, "hello China", 1500); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭连接池 //pool.close(); } } private static void testSubscribe(MqttClient clientSub, String topic) throws Exception { ThreadJob tjob = new ThreadJob() { @Override public Object execute() throws Exception { clientSub.subscribe(topic, QoS, (topic, msg) -> { System.out.println("从主题" + topic + "收到消息: " + new String(msg.getPayload())); }); while (true) { Thread.sleep(1000L); } } }; tjob.start(new Callback() { @Override public void call(Object obj) { System.out.println("执行成功"); } @Override public void call(Object... objs) { System.out.println("执行成功"); } @Override public void exception(Exception e) { e.printStackTrace(); } }); } private static void testPublish(MqttClient clientPub, String topic, String message, long sleep) throws Exception { ThreadJob tjob = new ThreadJob() { @Override public Object execute() throws Exception { int count = 0 ; while (true) { // 发布消息 byte[] bs = (message + " " + count++).getBytes() ; clientPub.publish(topic, bs, QoS, false); Thread.sleep(sleep); } } }; tjob.start(new Callback() { @Override public void call(Object obj) { System.out.println("执行成功"); } @Override public void call(Object... objs) { System.out.println("执行成功"); } @Override public void exception(Exception e) { e.printStackTrace(); } }); } }