package com.dy.rtuMw.server.mqtt; import com.dy.common.mw.protocol4Mqtt.*; import com.dy.common.mw.protocol4Mqtt.MqttCallback; import com.dy.common.mw.protocol4Mqtt.MqttTopic; import com.dy.common.util.Callback; import com.dy.rtuMw.server.forTcp.RtuLogDealer; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.* ; /** * @Author: liurunyu * @Date: 2025/6/4 15:52 * @Description */ @Slf4j public class MqttMessageListener implements IMqttMessageListener{ private MqttNotify notify ; public MqttMessageListener(MqttNotify notify){ this.notify = notify ; } @Override public void messageArrived(String topic, MqttMessage msg) throws Exception { try { MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic); MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback() { @Override public void callback(MqttSubMsg subMsg) { DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息 主题:" + subMsg.topic.longName() + " 元数据:" + subMsg.metaData); } @Override public void notify(String devId, MqttNotifyInfo... infos) { if (notify != null) { notify.notify(devId, infos); } } }); this.nextDeal(subMsg); }catch(Exception e){ log.error("处理MQTT订阅消息发生异常", e); } } private void nextDeal(MqttSubMsg subMsg)throws Exception { subMsg.action(new MqttSubMsgDealer()); } }