package com.dy.pipIrrApp.workOrder.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * @author ZhuBaoMin * @date 2024-11-16 11:29 * @LastEditTime 2024-11-16 11:29 * @Description 初始化一个Mqtt客户端,并根据配置订阅topic */ @Slf4j public class MqttMsgSubscriber { public void readSubscribeTopicMessage(){ try { //String broker = "tcp://127.0.0.1:1883"; //String username = "mqtt_u"; //String password = "yjy"; //String topic = "workOrder"; String broker = "tcp://127.0.0.1:1884"; String username = "server"; String password = "1234"; String topic = "report/#"; Integer qos = 2; String clientId = System.currentTimeMillis() + ""; MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence()); // 连接参数 MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); //是否清除会话 options.setCleanSession(true); options.setConnectionTimeout(60); options.setKeepAliveInterval(60); client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { log.error("连接丢失"); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { log.info("topic为: " + topic); log.info("qos为: " + mqttMessage.getQos()); log.info("消息内容为: " + new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { // 当消息被完全传送出去后调用 log.info("交付完成 ---Delivery complete!"); // 可以在这里处理一些发送完成后的清理工作 } }); client.connect(options); client.subscribe(topic, qos); } catch (MqttException e){ log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage()); } catch (Exception e){ log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage()); } } }