package com.dy.pipIrrApp.workOrder.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; /** * @author ZhuBaoMin * @date 2024-11-16 11:29 * @LastEditTime 2024-11-16 11:29 * @Description 初始化一个Mqtt客户端,并根据配置订阅topic */ @Slf4j public class MqttMsgSubscriber { @Value("${spring.mqtt.broker}") private String broker; @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.topic}") private String topic; @Value("${spring.mqtt.qos}") private Integer qos; private String clientId = System.currentTimeMillis() + ""; public void readSubscribeTopicMessage(){ try { MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence()); // 连接参数 MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); //是否清除会话 options.setCleanSession(true); options.setConnectionTimeout(60); options.setKeepAliveInterval(60); client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { log.error("连接丢失"); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { log.info("topic为: " + topic); log.info("qos为: " + mqttMessage.getQos()); log.info("消息内容为: " + new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { // 当消息被完全传送出去后调用 log.info("交付完成 ---Delivery complete!"); // 可以在这里处理一些发送完成后的清理工作 } }); client.connect(options); client.subscribe(topic, qos); } catch (MqttException e){ log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage()); } catch (Exception e){ log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage()); } } }