From 6b828ba1310db528aa8172bd14a0253ebca5a844 Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期二, 10 六月 2025 18:34:29 +0800
Subject: [PATCH] 基于mqtt的水肥机、气象站、墒情站协议、功能模块继续开发

---
 pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java |   35 +++++++++++++++++++++++++++++------
 1 files changed, 29 insertions(+), 6 deletions(-)

diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
index 19bd3eb..7d8c6ea 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
@@ -1,9 +1,11 @@
 package com.dy.rtuMw.server.mqtt;
 
-import com.dy.common.mw.protocol4Mqtt.MqttMsgParser;
-import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
-import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
+import com.dy.common.mw.protocol4Mqtt.*;
+import com.dy.common.mw.protocol4Mqtt.MqttCallback;
+import com.dy.common.mw.protocol4Mqtt.MqttTopic;
 import com.dy.common.util.Callback;
+import com.dy.rtuMw.server.forTcp.RtuLogDealer;
+import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.* ;
 
 /**
@@ -11,11 +13,30 @@
  * @Date: 2025/6/4 15:52
  * @Description
  */
+@Slf4j
 public class MqttMessageListener implements IMqttMessageListener{
+    private MqttNotify notify ;
+    public MqttMessageListener(MqttNotify notify){
+        this.notify = notify ;
+    }
+
     @Override
     public void messageArrived(String topic, MqttMessage msg) throws Exception {
-        MqttMsgParser parser = new MqttMsgParser() ;
-        MqttSubMsg subMsg = parser.parseSubMsg(topic, msg) ;
+        MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic) ;
+        MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback(){
+            @Override
+            public void callback(MqttSubMsg subMsg) {
+                DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol);
+                DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId);
+                RtuLogDealer.log4Mqtt(subMsg.deviceId, "璁㈤槄娑堟伅    涓婚锛�" + subMsg.topic + "   娑堟伅锛�" + subMsg.msg);
+            }
+            @Override
+            public void notify(String devId, MqttNotifyInfo... infos) {
+                if(notify != null){
+                    notify.notify(devId, infos) ;
+                }
+            }
+        }) ;
         this.nextDeal(subMsg);
     }
     private void nextDeal(MqttSubMsg subMsg)throws Exception {
@@ -25,17 +46,19 @@
                 MqttSubMsg subMs = (MqttSubMsg) obj ;
                 MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ;
                 if(pubMs != null){
+                    //鍖归厤鍒颁笅琛屾秷鎭紙鍛戒护锛�
                     subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ;
                     subMs.commandId = pubMs.commandId ;
                     try {
                         MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs));
                     } catch (Exception e) {
-                        e.printStackTrace();
+                        log.error("缂撳瓨鍙戝竷娑堟伅锛堝懡浠わ級缁撴灉鍙戠敓寮傚父", e);
                     }
                 }
                 try{
                     MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg));
                 }catch (Exception e){
+                    log.error("缂撳瓨璁㈤槄娑堟伅鏁版嵁鍙戠敓寮傚父", e);
                 }
             }
             @Override

--
Gitblit v1.8.0