| package com.dy.rtuMw.server.mqtt; | 
|   | 
| import com.dy.common.mw.protocol4Mqtt.*; | 
| import com.dy.common.mw.protocol4Mqtt.MqttCallback; | 
| import com.dy.common.mw.protocol4Mqtt.MqttTopic; | 
| import com.dy.common.util.Callback; | 
| import com.dy.rtuMw.server.forTcp.RtuLogDealer; | 
| import lombok.extern.slf4j.Slf4j; | 
| import org.eclipse.paho.client.mqttv3.* ; | 
|   | 
| /** | 
|  * @Author: liurunyu | 
|  * @Date: 2025/6/4 15:52 | 
|  * @Description | 
|  */ | 
| @Slf4j | 
| public class MqttMessageListener implements IMqttMessageListener{ | 
|     private MqttNotify notify ; | 
|     public MqttMessageListener(MqttNotify notify){ | 
|         this.notify = notify ; | 
|     } | 
|   | 
|     @Override | 
|     public void messageArrived(String topic, MqttMessage msg) throws Exception { | 
|         try { | 
|             MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic); | 
|             MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback() { | 
|                 @Override | 
|                 public void callback(MqttSubMsg subMsg) { | 
|                     DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); | 
|                     DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); | 
|                     RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息    主题:" + subMsg.topic.longName() + "   原数据:" + subMsg.metaData); | 
|                 } | 
|   | 
|                 @Override | 
|                 public void notify(String devId, MqttNotifyInfo... infos) { | 
|                     if (notify != null) { | 
|                         notify.notify(devId, infos); | 
|                     } | 
|                 } | 
|             }); | 
|             this.nextDeal(subMsg); | 
|         }catch(Exception e){ | 
|             log.error("处理MQTT订阅消息发生异常", e); | 
|         } | 
|     } | 
|     private void nextDeal(MqttSubMsg subMsg)throws Exception { | 
|         subMsg.action(new MqttSubMsgDealer()); | 
|     } | 
| } |