From 5fefff8c747cbf5d526f6108a215bd813ac36034 Mon Sep 17 00:00:00 2001
From: liurunyu <lry9898@163.com>
Date: 星期三, 11 六月 2025 13:44:17 +0800
Subject: [PATCH] Merge branch 'master' of http://8.140.179.55:20000/r/pipIrr-SV
---
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java | 42 +++++++++++++++++++++++++++++++++++-------
1 files changed, 35 insertions(+), 7 deletions(-)
diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
index 19bd3eb..0414628 100644
--- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
+++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java
@@ -1,9 +1,11 @@
package com.dy.rtuMw.server.mqtt;
-import com.dy.common.mw.protocol4Mqtt.MqttMsgParser;
-import com.dy.common.mw.protocol4Mqtt.MqttPubMsg;
-import com.dy.common.mw.protocol4Mqtt.MqttSubMsg;
+import com.dy.common.mw.protocol4Mqtt.*;
+import com.dy.common.mw.protocol4Mqtt.MqttCallback;
+import com.dy.common.mw.protocol4Mqtt.MqttTopic;
import com.dy.common.util.Callback;
+import com.dy.rtuMw.server.forTcp.RtuLogDealer;
+import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.* ;
/**
@@ -11,12 +13,36 @@
* @Date: 2025/6/4 15:52
* @Description
*/
+@Slf4j
public class MqttMessageListener implements IMqttMessageListener{
+ private MqttNotify notify ;
+ public MqttMessageListener(MqttNotify notify){
+ this.notify = notify ;
+ }
+
@Override
public void messageArrived(String topic, MqttMessage msg) throws Exception {
- MqttMsgParser parser = new MqttMsgParser() ;
- MqttSubMsg subMsg = parser.parseSubMsg(topic, msg) ;
- this.nextDeal(subMsg);
+ try {
+ MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic);
+ MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback() {
+ @Override
+ public void callback(MqttSubMsg subMsg) {
+ DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol);
+ DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId);
+ RtuLogDealer.log4Mqtt(subMsg.deviceId, "璁㈤槄娑堟伅 涓婚锛�" + subMsg.topic.longName() + " 鍏冩暟鎹細" + subMsg.metaData);
+ }
+
+ @Override
+ public void notify(String devId, MqttNotifyInfo... infos) {
+ if (notify != null) {
+ notify.notify(devId, infos);
+ }
+ }
+ });
+ this.nextDeal(subMsg);
+ }catch(Exception e){
+ log.error("澶勭悊MQTT璁㈤槄娑堟伅鍙戠敓寮傚父", e);
+ }
}
private void nextDeal(MqttSubMsg subMsg)throws Exception {
subMsg.action(new Callback() {
@@ -25,17 +51,19 @@
MqttSubMsg subMs = (MqttSubMsg) obj ;
MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ;
if(pubMs != null){
+ //鍖归厤鍒颁笅琛屾秷鎭紙鍛戒护锛�
subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ;
subMs.commandId = pubMs.commandId ;
try {
MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs));
} catch (Exception e) {
- e.printStackTrace();
+ log.error("缂撳瓨鍙戝竷娑堟伅锛堝懡浠わ級缁撴灉鍙戠敓寮傚父", e);
}
}
try{
MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg));
}catch (Exception e){
+ log.error("缂撳瓨璁㈤槄娑堟伅鏁版嵁鍙戠敓寮傚父", e);
}
}
@Override
--
Gitblit v1.8.0