| New file | 
|  |  |  | 
|---|
|  |  |  | package com.dy.pipIrrApp.workOrder.mqtt; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import lombok.extern.slf4j.Slf4j; | 
|---|
|  |  |  | import org.eclipse.paho.client.mqttv3.*; | 
|---|
|  |  |  | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * @author ZhuBaoMin | 
|---|
|  |  |  | * @date 2024-11-16 11:29 | 
|---|
|  |  |  | * @LastEditTime 2024-11-16 11:29 | 
|---|
|  |  |  | * @Description 初始化一个Mqtt客户端,并根据配置订阅topic | 
|---|
|  |  |  | */ | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Slf4j | 
|---|
|  |  |  | public class MqttMsgSubscriber { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | public void readSubscribeTopicMessage(){ | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | //String broker = "tcp://127.0.0.1:1883"; | 
|---|
|  |  |  | //String username = "mqtt_u"; | 
|---|
|  |  |  | //String password = "yjy"; | 
|---|
|  |  |  | //String topic = "workOrder"; | 
|---|
|  |  |  | String broker = "tcp://127.0.0.1:1884"; | 
|---|
|  |  |  | String username = "server"; | 
|---|
|  |  |  | String password = "1234"; | 
|---|
|  |  |  | String topic = "report/#"; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | Integer qos = 2; | 
|---|
|  |  |  | String clientId = System.currentTimeMillis() + ""; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence()); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | // 连接参数 | 
|---|
|  |  |  | MqttConnectOptions options = new MqttConnectOptions(); | 
|---|
|  |  |  | options.setUserName(username); | 
|---|
|  |  |  | options.setPassword(password.toCharArray()); | 
|---|
|  |  |  | //是否清除会话 | 
|---|
|  |  |  | options.setCleanSession(true); | 
|---|
|  |  |  | options.setConnectionTimeout(60); | 
|---|
|  |  |  | options.setKeepAliveInterval(60); | 
|---|
|  |  |  | client.setCallback(new MqttCallback() { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void connectionLost(Throwable throwable) { | 
|---|
|  |  |  | log.error("连接丢失"); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { | 
|---|
|  |  |  | log.info("topic为: " + topic); | 
|---|
|  |  |  | log.info("qos为: " + mqttMessage.getQos()); | 
|---|
|  |  |  | log.info("消息内容为: " + new String(mqttMessage.getPayload())); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { | 
|---|
|  |  |  | // 当消息被完全传送出去后调用 | 
|---|
|  |  |  | log.info("交付完成 ---Delivery complete!"); | 
|---|
|  |  |  | // 可以在这里处理一些发送完成后的清理工作 | 
|---|
|  |  |  | } | 
|---|
|  |  |  | }); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | client.connect(options); | 
|---|
|  |  |  | client.subscribe(topic, qos); | 
|---|
|  |  |  | } catch (MqttException e){ | 
|---|
|  |  |  | log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage()); | 
|---|
|  |  |  | } catch (Exception e){ | 
|---|
|  |  |  | log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|