zhubaomin
2024-11-22 8bf69a5124d12aeca128ef93432e2412b9cc54ac
MQTT测试Demo
5个文件已修改
2个文件已删除
4个文件已添加
380 ■■■■ 已修改文件
pipIrr-platform/pipIrr-global/pom.xml 25 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-global/src/main/resources/application-database-sp.yml 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-global/src/main/resources/application-database-test.yml 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/java/com/dy/pipIrrApp/workOrder/ConsumerListener.java 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/java/com/dy/pipIrrApp/workOrder/WorkOrderCtrl.java 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/java/com/dy/pipIrrApp/workOrder/mqtt/MqttClientConnectorPool.java 91 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/java/com/dy/pipIrrApp/workOrder/mqtt/MqttMsgSender.java 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/java/com/dy/pipIrrApp/workOrder/mqtt/MqttMsgSubscriber.java 77 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/java/com/dy/pipIrrApp/workOrder/mqtt/TestController.java 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/resources/application.yml 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pipIrr-platform/pipIrr-global/pom.xml
@@ -166,11 +166,28 @@
            <version>2.3.2</version>
        </dependency>
        <!--RocketMQ-->
        <!-- Spring Boot MQTT 依赖 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.reactivestreams</groupId>
            <artifactId>reactive-streams</artifactId>
            <version>1.0.3</version>
        </dependency>
        <!-- Mosquitto 客户端库 -->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
    </dependencies>
pipIrr-platform/pipIrr-global/src/main/resources/application-database-sp.yml
@@ -5,9 +5,9 @@
            #name: sp
            type: com.alibaba.druid.pool.DruidDataSource
            driverClassName: com.mysql.cj.jdbc.Driver
            url: jdbc:mysql://192.168.40.166:3306/pipIrr_sp?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
#            url: jdbc:mysql://192.168.40.166:3306/pipIrr_sp?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
#            url: jdbc:mysql://8.130.130.233:3306/pipIrr_sp?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
#            url: jdbc:mysql://127.0.0.1:3306/pipIrr_sp?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
            url: jdbc:mysql://127.0.0.1:3306/pipIrr_sp?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
            username: root
            password: dysql,;.abc!@#
            druid:
pipIrr-platform/pipIrr-global/src/main/resources/application-database-test.yml
@@ -5,9 +5,9 @@
      #name: test
      type: com.alibaba.druid.pool.DruidDataSource
      driverClassName: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://192.168.40.166:3306/pipIrr_test?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
#      url: jdbc:mysql://192.168.40.166:3306/pipIrr_test?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
#      url: jdbc:mysql://8.130.130.233:3306/pipIrr_test?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
#      url: jdbc:mysql://127.0.0.1:3306/pipIrr_test?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
      url: jdbc:mysql://127.0.0.1:3306/pipIrr_test?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
      username: root
      password: dysql,;.abc!@#
      druid:
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/java/com/dy/pipIrrApp/workOrder/ConsumerListener.java
File was deleted
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/java/com/dy/pipIrrApp/workOrder/WorkOrderCtrl.java
@@ -18,19 +18,12 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.BindingResult;
import org.springframework.web.bind.annotation.*;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
@@ -47,18 +40,6 @@
@RequiredArgsConstructor
public class WorkOrderCtrl {
    private final WorkOrderSv workOrderSv;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Value("${rocketmq.name-server}")
    protected String nameServer;
    @Value("${rocketmq.producer.group}")
    protected String producerGroup;
    @Value("${rocketmq.topic}")
    protected String topic;
    /**
     * 创建工单
@@ -88,39 +69,11 @@
            return BaseResponseUtils.buildErrorMsg("获取工单失败");
        }
        if(!sendWorkOrder(voWorkOrder, workOrderId)) {
            return BaseResponseUtils.buildErrorMsg("工单推送失败");
        }
        //if(!sendWorkOrder(voWorkOrder, workOrderId)) {
        //    return BaseResponseUtils.buildErrorMsg("工单推送失败");
        //}
        return BaseResponseUtils.buildSuccess();
    }
    /**
     * 通过RocketMQ推送工单主键
     * @param voWorkOrder 工单视图对象
     * @param workOrderId 工单ID
     * @return
     * @throws MQClientException
     * @throws MQBrokerException
     * @throws RemotingException
     * @throws InterruptedException
     */
    private Boolean sendWorkOrder(VoWorkOrder voWorkOrder, Long workOrderId) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        String tag = voWorkOrder.getInspector();
        String key = voWorkOrder.getInspectorId().toString();
        //String message = JSON.toJSONString(voWorkOrder);
        String message = workOrderId.toString();
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(nameServer);
        producer.start();
        Message msg = new Message(topic, tag, key, message.getBytes(StandardCharsets.UTF_8));
        SendResult approveSendResult = producer.send(msg);
        if(!approveSendResult.getSendStatus().toString().equals("SEND_OK")) {
            return false;
        }
        return true;
    }
    /**
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/java/com/dy/pipIrrApp/workOrder/mqtt/MqttClientConnectorPool.java
New file
@@ -0,0 +1,91 @@
package com.dy.pipIrrApp.workOrder.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
 * @author ZhuBaoMin
 * @date 2024-11-16 11:27
 * @LastEditTime 2024-11-16 11:27
 * @Description MQTT客户端连接池,对外提供一个初始化的MQTT客户端
 */
@Slf4j
public class MqttClientConnectorPool {
    public static MqttClient mqttClient;
    /**
     * 连接MQTT客户端
     * @return 获取MQTT连队对象
     */
    public static MqttClient connectMQTT() {
        if (mqttClient != null){
            log.info("已存在!");
            return mqttClient;
        }
        try {
            // broker及连接信息
            String broker = "tcp://127.0.0.1:1883";
            String username = "mqtt_u";
            String password = "yjy";
            String clientId = System.currentTimeMillis() + "";
            //创建MQTT客户端(指定broker、客户端id、消息持久策略)
            mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
            //创建连接参数配置
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            //是否清除会话
            options.setCleanSession(true);
            //连接超时时间
            options.setKeepAliveInterval(20);
            //是否自动重连
            options.setAutomaticReconnect(true);
            mqttClient.connect(options);
            log.info("MqttClient 服务启动broker初始化!");
        } catch (MqttException e){
            log.error("MqttClient connect Error:{}", e.getMessage());
            e.printStackTrace();
        }
        return mqttClient;
    }
    /**
     * 关闭MQTT客户端
     * @param client client
     */
    public static void closeClient(MqttClient client){
        try {
            // 断开连接
            client.disconnect();
            // 关闭客户端
            client.close();
        } catch (MqttException e){
            log.error("MqttClient disconnect or close Error:{}", e.getMessage());
            e.printStackTrace();
        }
    }
    /**
     * 关闭MQTT客户端
     */
    public static void closeStaticClient(){
        try {
            if (mqttClient != null){
                // 断开连接
                mqttClient.disconnect();
                // 关闭客户端
                mqttClient.close();
            }
        } catch (MqttException e){
            log.error("MqttClient disconnect or close Error:{}", e.getMessage());
            e.printStackTrace();
        }
    }
}
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/java/com/dy/pipIrrApp/workOrder/mqtt/MqttMsgSender.java
New file
@@ -0,0 +1,27 @@
package com.dy.pipIrrApp.workOrder.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
 * @author ZhuBaoMin
 * @date 2024-11-16 11:28
 * @LastEditTime 2024-11-16 11:28
 * @Description 消息发送方法
 */
@Slf4j
public class MqttMsgSender {
    public void sendMessage(MqttClient client, String topic, String content, int qos){
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(qos);
        try{
            client.publish(topic,message);
        } catch (MqttException e){
            log.error("MqttClient publish text info Error:{}!", e.getMessage());
            e.printStackTrace();
        }
    }
}
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/java/com/dy/pipIrrApp/workOrder/mqtt/MqttMsgSubscriber.java
New file
@@ -0,0 +1,77 @@
package com.dy.pipIrrApp.workOrder.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
/**
 * @author ZhuBaoMin
 * @date 2024-11-16 11:29
 * @LastEditTime 2024-11-16 11:29
 * @Description 初始化一个Mqtt客户端,并根据配置订阅topic
 */
@Slf4j
public class MqttMsgSubscriber {
    @Value("${spring.mqtt.broker}")
    private String broker;
    @Value("${spring.mqtt.username}")
    private String username;
    @Value("${spring.mqtt.password}")
    private String password;
    @Value("${spring.mqtt.topic}")
    private String topic;
    @Value("${spring.mqtt.qos}")
    private Integer qos;
    private String clientId = System.currentTimeMillis() + "";
    public void readSubscribeTopicMessage(){
        try {
            MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
            // 连接参数
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            //是否清除会话
            options.setCleanSession(true);
            options.setConnectionTimeout(60);
            options.setKeepAliveInterval(60);
            client.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable throwable) {
                    log.error("连接丢失");
                }
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    log.info("topic为: " + topic);
                    log.info("qos为: " + mqttMessage.getQos());
                    log.info("消息内容为: " + new String(mqttMessage.getPayload()));
                }
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    // 当消息被完全传送出去后调用
                    log.info("交付完成 ---Delivery complete!");
                    // 可以在这里处理一些发送完成后的清理工作
                }
            });
            client.connect(options);
            client.subscribe(topic, qos);
        } catch (MqttException e){
            log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage());
        } catch (Exception e){
            log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage());
        }
    }
}
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/java/com/dy/pipIrrApp/workOrder/mqtt/TestController.java
New file
@@ -0,0 +1,41 @@
package com.dy.pipIrrApp.workOrder.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author ZhuBaoMin
 * @date 2024-11-16 11:32
 * @LastEditTime 2024-11-16 11:32
 * @Description
 */
@Slf4j
@RestController
@RequestMapping(path = "mqtt")
public class TestController {
    @GetMapping("/mqtt/{msg}")
    public String testSendMqttMsg(@PathVariable("msg") String msg){
        log.info("消息内容:{}.", msg);
        MqttClient mqttClient = MqttClientConnectorPool.connectMQTT();
        MqttMsgSender sender = new MqttMsgSender();
        String content = "{" + " \"deviceNo\": \"" + msg + "\"," + " \"val\": 232.5" + "}";
        String topic = "workOrder";
        int qos = 1;
        if (null != mqttClient){
            sender.sendMessage(mqttClient, topic, content, qos);
        } else {
            log.info("MqttClient为空,无法发送!");
            return "失败!";
        }
        return "成功!";
    }
}
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
File was deleted
pipIrr-platform/pipIrr-web/pipIrr-web-app/src/main/resources/application.yml
@@ -1,6 +1,12 @@
spring:
    profiles:
        include: global, database, database-ym, database-sp, database-test
    mqtt:
        broker: tcp://127.0.0.1:1883
        username: mqtt_u
        password: yjy
        topic: workOrder
        qos: 2
#actutor的web端口
management:
@@ -15,28 +21,6 @@
            #GenerateIdSetSuffixListener中应用,取值范围是0-99
            idSuffix: ${pipIrr.app.idSuffix}
rocketmq:
    topic: "workOrder"
    consumer:
        group: consumer_group
        # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
        pull-batch-size: 10
    name-server: 127.0.0.1:9876
    producer:
        # 发送同一类消息的设置为同一个group,保证唯一
        group: producer_group
        # 发送消息超时时间,默认3000
        sendMessageTimeout: 10000
        # 发送消息失败重试次数,默认2
        retryTimesWhenSendFailed: 2
        # 异步消息重试次数,默认2
        retryTimesWhenSendAsyncFailed: 2
        # 消息最大长度,默认1024 * 1024 * 4(默认4M)
        maxMessageSize: 4096
        # 压缩消息阈值,默认4k(1024 * 4)
        compressMessageBodyThreshold: 4096
        # 是否在内部发送失败时重试另一个broker,默认false
        retryNextServer: false
logging:
    level:
        com: