| | |
| | | package com.dy.rtuMw.server.mqtt; |
| | | |
| | | import com.dy.common.mw.protocol4Mqtt.MqttMsgParser; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; |
| | | import com.dy.common.mw.protocol4Mqtt.*; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttCallback; |
| | | import com.dy.common.mw.protocol4Mqtt.MqttTopic; |
| | | import com.dy.common.util.Callback; |
| | | import com.dy.rtuMw.server.forTcp.RtuLogDealer; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.eclipse.paho.client.mqttv3.* ; |
| | | |
| | | /** |
| | |
| | | * @Date: 2025/6/4 15:52 |
| | | * @Description |
| | | */ |
| | | @Slf4j |
| | | public class MqttMessageListener implements IMqttMessageListener{ |
| | | private MqttNotify notify ; |
| | | public MqttMessageListener(MqttNotify notify){ |
| | | this.notify = notify ; |
| | | } |
| | | |
| | | @Override |
| | | public void messageArrived(String topic, MqttMessage msg) throws Exception { |
| | | MqttMsgParser parser = new MqttMsgParser() ; |
| | | MqttSubMsg subMsg = parser.parseSubMsg(topic, msg) ; |
| | | MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic) ; |
| | | MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback(){ |
| | | @Override |
| | | public void callback(MqttSubMsg subMsg) { |
| | | DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); |
| | | DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); |
| | | RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息 主题:" + subMsg.topic + " 消息:" + subMsg.msg); |
| | | } |
| | | @Override |
| | | public void notify(String devId, MqttNotifyInfo... infos) { |
| | | if(notify != null){ |
| | | notify.notify(devId, infos) ; |
| | | } |
| | | } |
| | | }) ; |
| | | this.nextDeal(subMsg); |
| | | } |
| | | private void nextDeal(MqttSubMsg subMsg)throws Exception { |
| | |
| | | MqttSubMsg subMs = (MqttSubMsg) obj ; |
| | | MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ; |
| | | if(pubMs != null){ |
| | | //匹配到下行消息(命令) |
| | | subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ; |
| | | subMs.commandId = pubMs.commandId ; |
| | | try { |
| | | MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs)); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | log.error("缓存发布消息(命令)结果发生异常", e); |
| | | } |
| | | } |
| | | try{ |
| | | MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg)); |
| | | }catch (Exception e){ |
| | | log.error("缓存订阅消息数据发生异常", e); |
| | | } |
| | | } |
| | | @Override |