package com.dy.rtuMw.server.mqtt; 
 | 
  
 | 
import com.dy.common.mw.protocol4Mqtt.*; 
 | 
import com.dy.common.mw.protocol4Mqtt.MqttCallback; 
 | 
import com.dy.common.mw.protocol4Mqtt.MqttTopic; 
 | 
import com.dy.common.util.Callback; 
 | 
import com.dy.rtuMw.server.forTcp.RtuLogDealer; 
 | 
import lombok.extern.slf4j.Slf4j; 
 | 
import org.eclipse.paho.client.mqttv3.* ; 
 | 
  
 | 
/** 
 | 
 * @Author: liurunyu 
 | 
 * @Date: 2025/6/4 15:52 
 | 
 * @Description 
 | 
 */ 
 | 
@Slf4j 
 | 
public class MqttMessageListener implements IMqttMessageListener{ 
 | 
    private MqttNotify notify ; 
 | 
    public MqttMessageListener(MqttNotify notify){ 
 | 
        this.notify = notify ; 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void messageArrived(String topic, MqttMessage msg) throws Exception { 
 | 
        try { 
 | 
            MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic); 
 | 
            MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback() { 
 | 
                @Override 
 | 
                public void callback(MqttSubMsg subMsg) { 
 | 
                    DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); 
 | 
                    DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); 
 | 
                    RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息    主题:" + subMsg.topic.longName() + "   元数据:" + subMsg.metaData); 
 | 
                } 
 | 
  
 | 
                @Override 
 | 
                public void notify(String devId, MqttNotifyInfo... infos) { 
 | 
                    if (notify != null) { 
 | 
                        notify.notify(devId, infos); 
 | 
                    } 
 | 
                } 
 | 
            }); 
 | 
            this.nextDeal(subMsg); 
 | 
        }catch(Exception e){ 
 | 
            log.error("处理MQTT订阅消息发生异常", e); 
 | 
        } 
 | 
    } 
 | 
    private void nextDeal(MqttSubMsg subMsg)throws Exception { 
 | 
        subMsg.action(new MqttSubMsgDealer()); 
 | 
    } 
 | 
} 
 |