package com.dy.common.mw.channel.mqtt; 
 | 
  
 | 
import com.dy.common.util.Callback; 
 | 
import com.dy.common.util.ThreadJob; 
 | 
import org.eclipse.paho.client.mqttv3.MqttClient; 
 | 
  
 | 
/** 
 | 
 * @Author: liurunyu 
 | 
 * @Date: 2025/6/4 11:35 
 | 
 * @Description 
 | 
 */ 
 | 
public class Test { 
 | 
  
 | 
    private static String mqSvIp = "121.199.41.121"; 
 | 
    private static Integer mqSvPort = 1883; 
 | 
    private static String mqSvUserName = "dyyjy"; 
 | 
    private static String mqSvUserPassword = "Dyyjy2025,;.abc!@#"; 
 | 
  
 | 
    private static String topic1 = "test/topic1" ; 
 | 
    private static String topic2 = "test/topic2" ; 
 | 
  
 | 
    private static int maxConnections = 10 ; 
 | 
  
 | 
    private static MqttClientPool pool; 
 | 
    /** 
 | 
     * QoS 
 | 
     * 等级    名称                        消息传递特性 
 | 
     * 0    至多一次(At most once)    消息发送后不保证到达,可能丢失或重复,开销最小,可靠性最低。 
 | 
     * 1    至少一次(At least once)    消息至少会到达一次,可能重复,但不会丢失,可靠性中等,适用于多数场景。 
 | 
     * 2    恰好一次(Exactly once)    消息仅会到达一次,不重复且不丢失,可靠性最高,但开销最大,实现最复杂。 
 | 
     */ 
 | 
    private static int QoS = 1 ; 
 | 
  
 | 
    public static void main(String[] args) { 
 | 
        try{ 
 | 
            // 初始化连接池 
 | 
            pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections); 
 | 
            MqttClient clientSub = pool.popClient() ; 
 | 
            testSubscribe(clientSub, topic1); 
 | 
            testSubscribe(clientSub, topic2); 
 | 
            MqttClient clientPub = pool.popClient() ; 
 | 
            testPublish(clientPub, topic1, "hello world", 1000); 
 | 
            testPublish(clientPub, topic2, "hello China", 1500); 
 | 
        } catch (Exception e) { 
 | 
            e.printStackTrace(); 
 | 
        } finally { 
 | 
            // 关闭连接池 
 | 
            //pool.close(); 
 | 
        } 
 | 
    } 
 | 
    private static void testSubscribe(MqttClient clientSub, String topic) throws Exception { 
 | 
        ThreadJob tjob = new ThreadJob() { 
 | 
            @Override 
 | 
            public Object execute() throws Exception { 
 | 
                clientSub.subscribe(topic, QoS, (topic, msg) -> { 
 | 
                    System.out.println("从主题" + topic + "收到消息: " + new String(msg.getPayload())); 
 | 
                }); 
 | 
                while (true) { 
 | 
                    Thread.sleep(1000L); 
 | 
                } 
 | 
            } 
 | 
        }; 
 | 
        tjob.start(new Callback() { 
 | 
            @Override 
 | 
            public void call(Object obj) { 
 | 
                System.out.println("执行成功"); 
 | 
            } 
 | 
            @Override 
 | 
            public void call(Object... objs) { 
 | 
                System.out.println("执行成功"); 
 | 
            } 
 | 
  
 | 
            @Override 
 | 
            public void exception(Exception e) { 
 | 
                e.printStackTrace(); 
 | 
            } 
 | 
        }); 
 | 
    } 
 | 
  
 | 
    private static void testPublish(MqttClient clientPub, String topic, String message, long sleep) throws Exception { 
 | 
        ThreadJob tjob = new ThreadJob() { 
 | 
            @Override 
 | 
            public Object execute() throws Exception { 
 | 
                int count = 0 ; 
 | 
                while (true) { 
 | 
                    // 发布消息 
 | 
                    byte[] bs = (message + "  " + count++).getBytes() ; 
 | 
                    clientPub.publish(topic, bs, QoS, false); 
 | 
                    Thread.sleep(sleep); 
 | 
                } 
 | 
            } 
 | 
        }; 
 | 
        tjob.start(new Callback() { 
 | 
            @Override 
 | 
            public void call(Object obj) { 
 | 
                System.out.println("执行成功"); 
 | 
            } 
 | 
            @Override 
 | 
            public void call(Object... objs) { 
 | 
                System.out.println("执行成功"); 
 | 
            } 
 | 
  
 | 
            @Override 
 | 
            public void exception(Exception e) { 
 | 
                e.printStackTrace(); 
 | 
            } 
 | 
        }); 
 | 
    } 
 | 
} 
 |