package com.dy.common.mw.channel.mqtt;
|
|
import com.dy.common.util.Callback;
|
import com.dy.common.util.ThreadJob;
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
|
/**
|
* @Author: liurunyu
|
* @Date: 2025/6/4 11:35
|
* @Description
|
*/
|
public class Test {
|
|
private static String mqSvIp = "121.199.41.121";
|
private static Integer mqSvPort = 1883;
|
private static String mqSvUserName = "dyyjy";
|
private static String mqSvUserPassword = "Dyyjy2025,;.abc!@#";
|
|
private static String topic1 = "test/topic1" ;
|
private static String topic2 = "test/topic2" ;
|
|
private static int maxConnections = 10 ;
|
|
private static MqttClientPool pool;
|
/**
|
* QoS
|
* 等级 名称 消息传递特性
|
* 0 至多一次(At most once) 消息发送后不保证到达,可能丢失或重复,开销最小,可靠性最低。
|
* 1 至少一次(At least once) 消息至少会到达一次,可能重复,但不会丢失,可靠性中等,适用于多数场景。
|
* 2 恰好一次(Exactly once) 消息仅会到达一次,不重复且不丢失,可靠性最高,但开销最大,实现最复杂。
|
*/
|
private static int QoS = 1 ;
|
|
public static void main(String[] args) {
|
try{
|
// 初始化连接池
|
pool = new MqttClientPool("tcp://" + mqSvIp + ":" + mqSvPort, mqSvUserName, mqSvUserPassword, maxConnections);
|
MqttClient clientSub = pool.popClient() ;
|
testSubscribe(clientSub, topic1);
|
testSubscribe(clientSub, topic2);
|
MqttClient clientPub = pool.popClient() ;
|
testPublish(clientPub, topic1, "hello world", 1000);
|
testPublish(clientPub, topic2, "hello China", 1500);
|
} catch (Exception e) {
|
e.printStackTrace();
|
} finally {
|
// 关闭连接池
|
//pool.close();
|
}
|
}
|
private static void testSubscribe(MqttClient clientSub, String topic) throws Exception {
|
ThreadJob tjob = new ThreadJob() {
|
@Override
|
public Object execute() throws Exception {
|
clientSub.subscribe(topic, QoS, (topic, msg) -> {
|
System.out.println("从主题" + topic + "收到消息: " + new String(msg.getPayload()));
|
});
|
while (true) {
|
Thread.sleep(1000L);
|
}
|
}
|
};
|
tjob.start(new Callback() {
|
@Override
|
public void call(Object obj) {
|
System.out.println("执行成功");
|
}
|
@Override
|
public void call(Object... objs) {
|
System.out.println("执行成功");
|
}
|
|
@Override
|
public void exception(Exception e) {
|
e.printStackTrace();
|
}
|
});
|
}
|
|
private static void testPublish(MqttClient clientPub, String topic, String message, long sleep) throws Exception {
|
ThreadJob tjob = new ThreadJob() {
|
@Override
|
public Object execute() throws Exception {
|
int count = 0 ;
|
while (true) {
|
// 发布消息
|
byte[] bs = (message + " " + count++).getBytes() ;
|
clientPub.publish(topic, bs, QoS, false);
|
Thread.sleep(sleep);
|
}
|
}
|
};
|
tjob.start(new Callback() {
|
@Override
|
public void call(Object obj) {
|
System.out.println("执行成功");
|
}
|
@Override
|
public void call(Object... objs) {
|
System.out.println("执行成功");
|
}
|
|
@Override
|
public void exception(Exception e) {
|
e.printStackTrace();
|
}
|
});
|
}
|
}
|