New file |
| | |
| | | package com.dy.pipIrrApp.workOrder.mqtt; |
| | | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.eclipse.paho.client.mqttv3.*; |
| | | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
| | | |
| | | /** |
| | | * @author ZhuBaoMin |
| | | * @date 2024-11-16 11:29 |
| | | * @LastEditTime 2024-11-16 11:29 |
| | | * @Description 初始化一个Mqtt客户端,并根据配置订阅topic |
| | | */ |
| | | |
| | | @Slf4j |
| | | public class MqttMsgSubscriber { |
| | | |
| | | public void readSubscribeTopicMessage(){ |
| | | try { |
| | | //String broker = "tcp://127.0.0.1:1883"; |
| | | //String username = "mqtt_u"; |
| | | //String password = "yjy"; |
| | | //String topic = "workOrder"; |
| | | String broker = "tcp://127.0.0.1:1884"; |
| | | String username = "server"; |
| | | String password = "1234"; |
| | | String topic = "report/#"; |
| | | |
| | | Integer qos = 2; |
| | | String clientId = System.currentTimeMillis() + ""; |
| | | |
| | | MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence()); |
| | | |
| | | // 连接参数 |
| | | MqttConnectOptions options = new MqttConnectOptions(); |
| | | options.setUserName(username); |
| | | options.setPassword(password.toCharArray()); |
| | | //是否清除会话 |
| | | options.setCleanSession(true); |
| | | options.setConnectionTimeout(60); |
| | | options.setKeepAliveInterval(60); |
| | | client.setCallback(new MqttCallback() { |
| | | |
| | | @Override |
| | | public void connectionLost(Throwable throwable) { |
| | | log.error("连接丢失"); |
| | | } |
| | | |
| | | @Override |
| | | public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { |
| | | log.info("topic为: " + topic); |
| | | log.info("qos为: " + mqttMessage.getQos()); |
| | | log.info("消息内容为: " + new String(mqttMessage.getPayload())); |
| | | } |
| | | |
| | | @Override |
| | | public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { |
| | | // 当消息被完全传送出去后调用 |
| | | log.info("交付完成 ---Delivery complete!"); |
| | | // 可以在这里处理一些发送完成后的清理工作 |
| | | } |
| | | }); |
| | | |
| | | client.connect(options); |
| | | client.subscribe(topic, qos); |
| | | } catch (MqttException e){ |
| | | log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage()); |
| | | } catch (Exception e){ |
| | | log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage()); |
| | | } |
| | | } |
| | | } |