package com.dy.rtuMw.server.mqtt; import com.dy.common.mw.protocol4Mqtt.*; import com.dy.common.mw.protocol4Mqtt.MqttCallback; import com.dy.common.mw.protocol4Mqtt.MqttTopic; import com.dy.common.util.Callback; import com.dy.rtuMw.server.forTcp.RtuLogDealer; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.* ; /** * @Author: liurunyu * @Date: 2025/6/4 15:52 * @Description */ @Slf4j public class MqttMessageListener implements IMqttMessageListener{ private MqttNotify notify ; public MqttMessageListener(MqttNotify notify){ this.notify = notify ; } @Override public void messageArrived(String topic, MqttMessage msg) throws Exception { MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic) ; MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback(){ @Override public void callback(MqttSubMsg subMsg) { DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息 主题:" + subMsg.topic + " 消息:" + subMsg.msg); } @Override public void notify(String devId, MqttNotifyInfo... infos) { if(notify != null){ notify.notify(devId, infos) ; } } }) ; this.nextDeal(subMsg); } private void nextDeal(MqttSubMsg subMsg)throws Exception { subMsg.action(new Callback() { @Override public void call(Object obj) { MqttSubMsg subMs = (MqttSubMsg) obj ; MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ; if(pubMs != null){ //匹配到下行消息(命令) subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ; subMs.commandId = pubMs.commandId ; try { MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs)); } catch (Exception e) { log.error("缓存发布消息(命令)结果发生异常", e); } } try{ MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg)); }catch (Exception e){ log.error("缓存订阅消息数据发生异常", e); } } @Override public void call(Object... objs) { } @Override public void exception(Exception e) { } }); } }