package com.dy.pipIrrApp.workOrder.mqtt;
|
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.*;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
/**
|
* @author ZhuBaoMin
|
* @date 2024-11-16 11:29
|
* @LastEditTime 2024-11-16 11:29
|
* @Description 初始化一个Mqtt客户端,并根据配置订阅topic
|
*/
|
|
@Slf4j
|
public class MqttMsgSubscriber {
|
|
public void readSubscribeTopicMessage(){
|
try {
|
//String broker = "tcp://127.0.0.1:1883";
|
//String username = "mqtt_u";
|
//String password = "yjy";
|
//String topic = "workOrder";
|
String broker = "tcp://127.0.0.1:1884";
|
String username = "server";
|
String password = "1234";
|
String topic = "report/#";
|
|
Integer qos = 2;
|
String clientId = System.currentTimeMillis() + "";
|
|
MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
|
|
// 连接参数
|
MqttConnectOptions options = new MqttConnectOptions();
|
options.setUserName(username);
|
options.setPassword(password.toCharArray());
|
//是否清除会话
|
options.setCleanSession(true);
|
options.setConnectionTimeout(60);
|
options.setKeepAliveInterval(60);
|
client.setCallback(new MqttCallback() {
|
|
@Override
|
public void connectionLost(Throwable throwable) {
|
log.error("连接丢失");
|
}
|
|
@Override
|
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
|
log.info("topic为: " + topic);
|
log.info("qos为: " + mqttMessage.getQos());
|
log.info("消息内容为: " + new String(mqttMessage.getPayload()));
|
}
|
|
@Override
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
// 当消息被完全传送出去后调用
|
log.info("交付完成 ---Delivery complete!");
|
// 可以在这里处理一些发送完成后的清理工作
|
}
|
});
|
|
client.connect(options);
|
client.subscribe(topic, qos);
|
} catch (MqttException e){
|
log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage());
|
} catch (Exception e){
|
log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage());
|
}
|
}
|
}
|