package com.dy.pipIrrApp.workOrder.mqtt; 
 | 
  
 | 
import lombok.extern.slf4j.Slf4j; 
 | 
import org.eclipse.paho.client.mqttv3.*; 
 | 
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 
 | 
  
 | 
/** 
 | 
 * @author ZhuBaoMin 
 | 
 * @date 2024-11-16 11:29 
 | 
 * @LastEditTime 2024-11-16 11:29 
 | 
 * @Description 初始化一个Mqtt客户端,并根据配置订阅topic 
 | 
 */ 
 | 
  
 | 
@Slf4j 
 | 
public class MqttMsgSubscriber { 
 | 
  
 | 
    public void readSubscribeTopicMessage(){ 
 | 
        try { 
 | 
            //String broker = "tcp://127.0.0.1:1883"; 
 | 
            //String username = "mqtt_u"; 
 | 
            //String password = "yjy"; 
 | 
            //String topic = "workOrder"; 
 | 
            String broker = "tcp://127.0.0.1:1884"; 
 | 
            String username = "server"; 
 | 
            String password = "1234"; 
 | 
            String topic = "report/#"; 
 | 
  
 | 
            Integer qos = 2; 
 | 
            String clientId = System.currentTimeMillis() + ""; 
 | 
  
 | 
            MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence()); 
 | 
  
 | 
            // 连接参数 
 | 
            MqttConnectOptions options = new MqttConnectOptions(); 
 | 
            options.setUserName(username); 
 | 
            options.setPassword(password.toCharArray()); 
 | 
            //是否清除会话 
 | 
            options.setCleanSession(true); 
 | 
            options.setConnectionTimeout(60); 
 | 
            options.setKeepAliveInterval(60); 
 | 
            client.setCallback(new MqttCallback() { 
 | 
  
 | 
                @Override 
 | 
                public void connectionLost(Throwable throwable) { 
 | 
                    log.error("连接丢失"); 
 | 
                } 
 | 
  
 | 
                @Override 
 | 
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { 
 | 
                    log.info("topic为: " + topic); 
 | 
                    log.info("qos为: " + mqttMessage.getQos()); 
 | 
                    log.info("消息内容为: " + new String(mqttMessage.getPayload())); 
 | 
                } 
 | 
  
 | 
                @Override 
 | 
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 
 | 
                    // 当消息被完全传送出去后调用 
 | 
                    log.info("交付完成 ---Delivery complete!"); 
 | 
                    // 可以在这里处理一些发送完成后的清理工作 
 | 
                } 
 | 
            }); 
 | 
  
 | 
            client.connect(options); 
 | 
            client.subscribe(topic, qos); 
 | 
        } catch (MqttException e){ 
 | 
            log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage()); 
 | 
        } catch (Exception e){ 
 | 
            log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage()); 
 | 
        } 
 | 
    } 
 | 
} 
 |