| New file | 
 |  |  | 
 |  |  | package com.dy.pipIrrApp.workOrder.mqtt; | 
 |  |  |  | 
 |  |  | import lombok.extern.slf4j.Slf4j; | 
 |  |  | import org.eclipse.paho.client.mqttv3.*; | 
 |  |  | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; | 
 |  |  |  | 
 |  |  | /** | 
 |  |  |  * @author ZhuBaoMin | 
 |  |  |  * @date 2024-11-16 11:29 | 
 |  |  |  * @LastEditTime 2024-11-16 11:29 | 
 |  |  |  * @Description 初始化一个Mqtt客户端,并根据配置订阅topic | 
 |  |  |  */ | 
 |  |  |  | 
 |  |  | @Slf4j | 
 |  |  | public class MqttMsgSubscriber { | 
 |  |  |  | 
 |  |  |     public void readSubscribeTopicMessage(){ | 
 |  |  |         try { | 
 |  |  |             //String broker = "tcp://127.0.0.1:1883"; | 
 |  |  |             //String username = "mqtt_u"; | 
 |  |  |             //String password = "yjy"; | 
 |  |  |             //String topic = "workOrder"; | 
 |  |  |             String broker = "tcp://127.0.0.1:1884"; | 
 |  |  |             String username = "server"; | 
 |  |  |             String password = "1234"; | 
 |  |  |             String topic = "report/#"; | 
 |  |  |  | 
 |  |  |             Integer qos = 2; | 
 |  |  |             String clientId = System.currentTimeMillis() + ""; | 
 |  |  |  | 
 |  |  |             MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence()); | 
 |  |  |  | 
 |  |  |             // 连接参数 | 
 |  |  |             MqttConnectOptions options = new MqttConnectOptions(); | 
 |  |  |             options.setUserName(username); | 
 |  |  |             options.setPassword(password.toCharArray()); | 
 |  |  |             //是否清除会话 | 
 |  |  |             options.setCleanSession(true); | 
 |  |  |             options.setConnectionTimeout(60); | 
 |  |  |             options.setKeepAliveInterval(60); | 
 |  |  |             client.setCallback(new MqttCallback() { | 
 |  |  |  | 
 |  |  |                 @Override | 
 |  |  |                 public void connectionLost(Throwable throwable) { | 
 |  |  |                     log.error("连接丢失"); | 
 |  |  |                 } | 
 |  |  |  | 
 |  |  |                 @Override | 
 |  |  |                 public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { | 
 |  |  |                     log.info("topic为: " + topic); | 
 |  |  |                     log.info("qos为: " + mqttMessage.getQos()); | 
 |  |  |                     log.info("消息内容为: " + new String(mqttMessage.getPayload())); | 
 |  |  |                 } | 
 |  |  |  | 
 |  |  |                 @Override | 
 |  |  |                 public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { | 
 |  |  |                     // 当消息被完全传送出去后调用 | 
 |  |  |                     log.info("交付完成 ---Delivery complete!"); | 
 |  |  |                     // 可以在这里处理一些发送完成后的清理工作 | 
 |  |  |                 } | 
 |  |  |             }); | 
 |  |  |  | 
 |  |  |             client.connect(options); | 
 |  |  |             client.subscribe(topic, qos); | 
 |  |  |         } catch (MqttException e){ | 
 |  |  |             log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage()); | 
 |  |  |         } catch (Exception e){ | 
 |  |  |             log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage()); | 
 |  |  |         } | 
 |  |  |     } | 
 |  |  | } |