| package com.dy.pipIrrApp.workOrder.mqtt; | 
|   | 
| import lombok.extern.slf4j.Slf4j; | 
| import org.eclipse.paho.client.mqttv3.*; | 
| import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; | 
|   | 
| /** | 
|  * @author ZhuBaoMin | 
|  * @date 2024-11-16 11:29 | 
|  * @LastEditTime 2024-11-16 11:29 | 
|  * @Description 初始化一个Mqtt客户端,并根据配置订阅topic | 
|  */ | 
|   | 
| @Slf4j | 
| public class MqttMsgSubscriber { | 
|   | 
|     public void readSubscribeTopicMessage(){ | 
|         try { | 
|             //String broker = "tcp://127.0.0.1:1883"; | 
|             //String username = "mqtt_u"; | 
|             //String password = "yjy"; | 
|             //String topic = "workOrder"; | 
|             String broker = "tcp://127.0.0.1:1884"; | 
|             String username = "server"; | 
|             String password = "1234"; | 
|             String topic = "report/#"; | 
|   | 
|             Integer qos = 2; | 
|             String clientId = System.currentTimeMillis() + ""; | 
|   | 
|             MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence()); | 
|   | 
|             // 连接参数 | 
|             MqttConnectOptions options = new MqttConnectOptions(); | 
|             options.setUserName(username); | 
|             options.setPassword(password.toCharArray()); | 
|             //是否清除会话 | 
|             options.setCleanSession(true); | 
|             options.setConnectionTimeout(60); | 
|             options.setKeepAliveInterval(60); | 
|             client.setCallback(new MqttCallback() { | 
|   | 
|                 @Override | 
|                 public void connectionLost(Throwable throwable) { | 
|                     log.error("连接丢失"); | 
|                 } | 
|   | 
|                 @Override | 
|                 public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { | 
|                     log.info("topic为: " + topic); | 
|                     log.info("qos为: " + mqttMessage.getQos()); | 
|                     log.info("消息内容为: " + new String(mqttMessage.getPayload())); | 
|                 } | 
|   | 
|                 @Override | 
|                 public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { | 
|                     // 当消息被完全传送出去后调用 | 
|                     log.info("交付完成 ---Delivery complete!"); | 
|                     // 可以在这里处理一些发送完成后的清理工作 | 
|                 } | 
|             }); | 
|   | 
|             client.connect(options); | 
|             client.subscribe(topic, qos); | 
|         } catch (MqttException e){ | 
|             log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage()); | 
|         } catch (Exception e){ | 
|             log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage()); | 
|         } | 
|     } | 
| } |