From 21c080aa9da3acd53e014e8f917b50a48cb791cb Mon Sep 17 00:00:00 2001 From: liurunyu <lry9898@163.com> Date: 星期三, 11 六月 2025 13:44:04 +0800 Subject: [PATCH] 进行ApiFox发送内部命令测试,MQTTX发布气象数据测试,修改测试中发现的bug,修改不完善的地方。 --- pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java | 42 +++++++++++++++++++++++++++++++++++------- 1 files changed, 35 insertions(+), 7 deletions(-) diff --git a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java index 19bd3eb..0414628 100644 --- a/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java +++ b/pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/mqtt/MqttMessageListener.java @@ -1,9 +1,11 @@ package com.dy.rtuMw.server.mqtt; -import com.dy.common.mw.protocol4Mqtt.MqttMsgParser; -import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; -import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; +import com.dy.common.mw.protocol4Mqtt.*; +import com.dy.common.mw.protocol4Mqtt.MqttCallback; +import com.dy.common.mw.protocol4Mqtt.MqttTopic; import com.dy.common.util.Callback; +import com.dy.rtuMw.server.forTcp.RtuLogDealer; +import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.* ; /** @@ -11,12 +13,36 @@ * @Date: 2025/6/4 15:52 * @Description */ +@Slf4j public class MqttMessageListener implements IMqttMessageListener{ + private MqttNotify notify ; + public MqttMessageListener(MqttNotify notify){ + this.notify = notify ; + } + @Override public void messageArrived(String topic, MqttMessage msg) throws Exception { - MqttMsgParser parser = new MqttMsgParser() ; - MqttSubMsg subMsg = parser.parseSubMsg(topic, msg) ; - this.nextDeal(subMsg); + try { + MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic); + MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback() { + @Override + public void callback(MqttSubMsg subMsg) { + DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); + DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); + RtuLogDealer.log4Mqtt(subMsg.deviceId, "璁㈤槄娑堟伅 涓婚锛�" + subMsg.topic.longName() + " 鍏冩暟鎹細" + subMsg.metaData); + } + + @Override + public void notify(String devId, MqttNotifyInfo... infos) { + if (notify != null) { + notify.notify(devId, infos); + } + } + }); + this.nextDeal(subMsg); + }catch(Exception e){ + log.error("澶勭悊MQTT璁㈤槄娑堟伅鍙戠敓寮傚父", e); + } } private void nextDeal(MqttSubMsg subMsg)throws Exception { subMsg.action(new Callback() { @@ -25,17 +51,19 @@ MqttSubMsg subMs = (MqttSubMsg) obj ; MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ; if(pubMs != null){ + //鍖归厤鍒颁笅琛屾秷鎭紙鍛戒护锛� subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ; subMs.commandId = pubMs.commandId ; try { MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs)); } catch (Exception e) { - e.printStackTrace(); + log.error("缂撳瓨鍙戝竷娑堟伅锛堝懡浠わ級缁撴灉鍙戠敓寮傚父", e); } } try{ MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg)); }catch (Exception e){ + log.error("缂撳瓨璁㈤槄娑堟伅鏁版嵁鍙戠敓寮傚父", e); } } @Override -- Gitblit v1.8.0