zhubaomin
2 天以前 080c76ddb23b9f199ed2f59f3871b0058347d43e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package com.dy.common.mw.channel.mqtt;
 
import com.dy.common.util.Callback;
import com.dy.common.util.ThreadJob;
import org.eclipse.paho.client.mqttv3.MqttClient;
 
/**
 * @Author: liurunyu
 * @Date: 2025/6/4 11:35
 * @Description
 */
public class Test {
 
    private static String mqSvIp = "121.199.41.121";
    private static Integer mqSvPort = 1883;
    private static String mqSvUserName = "dyyjy";
    private static String mqSvUserPassword = "Dyyjy2025,;.abc!@#";
 
    private static String topic1 = "test/topic1" ;
    private static String topic2 = "test/topic2" ;
 
    private static int maxConnections = 10 ;
 
    private static MqttClientPool pool;
    /**
     * QoS
     * 等级    名称                        消息传递特性
     * 0    至多一次(At most once)    消息发送后不保证到达,可能丢失或重复,开销最小,可靠性最低。
     * 1    至少一次(At least once)    消息至少会到达一次,可能重复,但不会丢失,可靠性中等,适用于多数场景。
     * 2    恰好一次(Exactly once)    消息仅会到达一次,不重复且不丢失,可靠性最高,但开销最大,实现最复杂。
     */
    private static int QoS = 1 ;
 
    public static void main(String[] args) {
        try{
            // 初始化连接池
            pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections);
            MqttClient clientSub = pool.popClient() ;
            testSubscribe(clientSub, topic1);
            testSubscribe(clientSub, topic2);
            MqttClient clientPub = pool.popClient() ;
            testPublish(clientPub, topic1, "hello world", 1000);
            testPublish(clientPub, topic2, "hello China", 1500);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭连接池
            //pool.close();
        }
    }
    private static void testSubscribe(MqttClient clientSub, String topic) throws Exception {
        ThreadJob tjob = new ThreadJob() {
            @Override
            public Object execute() throws Exception {
                clientSub.subscribe(topic, QoS, (topic, msg) -> {
                    System.out.println("从主题" + topic + "收到消息: " + new String(msg.getPayload()));
                });
                while (true) {
                    Thread.sleep(1000L);
                }
            }
        };
        tjob.start(new Callback() {
            @Override
            public void call(Object obj) {
                System.out.println("执行成功");
            }
            @Override
            public void call(Object... objs) {
                System.out.println("执行成功");
            }
 
            @Override
            public void exception(Exception e) {
                e.printStackTrace();
            }
        });
    }
 
    private static void testPublish(MqttClient clientPub, String topic, String message, long sleep) throws Exception {
        ThreadJob tjob = new ThreadJob() {
            @Override
            public Object execute() throws Exception {
                int count = 0 ;
                while (true) {
                    // 发布消息
                    byte[] bs = (message + "  " + count++).getBytes() ;
                    clientPub.publish(topic, bs, QoS, false);
                    Thread.sleep(sleep);
                }
            }
        };
        tjob.start(new Callback() {
            @Override
            public void call(Object obj) {
                System.out.println("执行成功");
            }
            @Override
            public void call(Object... objs) {
                System.out.println("执行成功");
            }
 
            @Override
            public void exception(Exception e) {
                e.printStackTrace();
            }
        });
    }
}