|  |  |  | 
|---|
|  |  |  | package com.dy.rtuMw.server.mqtt; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import com.dy.common.mw.protocol4Mqtt.MqttMsgParser; | 
|---|
|  |  |  | import com.dy.common.mw.protocol4Mqtt.MqttPubMsg; | 
|---|
|  |  |  | import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; | 
|---|
|  |  |  | import com.dy.common.mw.protocol4Mqtt.*; | 
|---|
|  |  |  | import com.dy.common.mw.protocol4Mqtt.MqttCallback; | 
|---|
|  |  |  | import com.dy.common.mw.protocol4Mqtt.MqttTopic; | 
|---|
|  |  |  | import com.dy.common.util.Callback; | 
|---|
|  |  |  | import com.dy.rtuMw.server.forTcp.RtuLogDealer; | 
|---|
|  |  |  | import lombok.extern.slf4j.Slf4j; | 
|---|
|  |  |  | import org.eclipse.paho.client.mqttv3.* ; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | 
|---|
|  |  |  | * @Date: 2025/6/4 15:52 | 
|---|
|  |  |  | * @Description | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | @Slf4j | 
|---|
|  |  |  | public class MqttMessageListener implements IMqttMessageListener{ | 
|---|
|  |  |  | private MqttNotify notify ; | 
|---|
|  |  |  | public MqttMessageListener(MqttNotify notify){ | 
|---|
|  |  |  | this.notify = notify ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void messageArrived(String topic, MqttMessage msg) throws Exception { | 
|---|
|  |  |  | MqttMsgParser parser = new MqttMsgParser() ; | 
|---|
|  |  |  | MqttSubMsg subMsg = parser.parseSubMsg(topic, msg) ; | 
|---|
|  |  |  | this.nextDeal(subMsg); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | private void nextDeal(MqttSubMsg subMsg)throws Exception { | 
|---|
|  |  |  | subMsg.action(new Callback() { | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void call(Object obj) { | 
|---|
|  |  |  | MqttSubMsg subMs = (MqttSubMsg) obj ; | 
|---|
|  |  |  | MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ; | 
|---|
|  |  |  | if(pubMs != null){ | 
|---|
|  |  |  | subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ; | 
|---|
|  |  |  | subMs.commandId = pubMs.commandId ; | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs)); | 
|---|
|  |  |  | } catch (Exception e) { | 
|---|
|  |  |  | e.printStackTrace(); | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic); | 
|---|
|  |  |  | MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback() { | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void callback(MqttSubMsg subMsg) { | 
|---|
|  |  |  | DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); | 
|---|
|  |  |  | DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); | 
|---|
|  |  |  | RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息    主题:" + subMsg.topic.longName() + "   原数据:" + subMsg.metaData); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void notify(String devId, MqttNotifyInfo... infos) { | 
|---|
|  |  |  | if (notify != null) { | 
|---|
|  |  |  | notify.notify(devId, infos); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | try{ | 
|---|
|  |  |  | MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg)); | 
|---|
|  |  |  | }catch (Exception e){ | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void call(Object... objs) { | 
|---|
|  |  |  | } | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void exception(Exception e) { | 
|---|
|  |  |  | } | 
|---|
|  |  |  | }); | 
|---|
|  |  |  | }); | 
|---|
|  |  |  | this.nextDeal(subMsg); | 
|---|
|  |  |  | }catch(Exception e){ | 
|---|
|  |  |  | log.error("处理MQTT订阅消息发生异常", e); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | private void nextDeal(MqttSubMsg subMsg)throws Exception { | 
|---|
|  |  |  | subMsg.action(new MqttSubMsgDealer()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|