package com.dy.rtuMw.server.mqtt;
|
|
import com.dy.common.mw.protocol4Mqtt.*;
|
import com.dy.common.mw.protocol4Mqtt.MqttCallback;
|
import com.dy.common.mw.protocol4Mqtt.MqttTopic;
|
import com.dy.common.util.Callback;
|
import com.dy.rtuMw.server.forTcp.RtuLogDealer;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.* ;
|
|
/**
|
* @Author: liurunyu
|
* @Date: 2025/6/4 15:52
|
* @Description
|
*/
|
@Slf4j
|
public class MqttMessageListener implements IMqttMessageListener{
|
private MqttNotify notify ;
|
public MqttMessageListener(MqttNotify notify){
|
this.notify = notify ;
|
}
|
|
@Override
|
public void messageArrived(String topic, MqttMessage msg) throws Exception {
|
MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic) ;
|
MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback(){
|
@Override
|
public void callback(MqttSubMsg subMsg) {
|
DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol);
|
DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId);
|
RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息 主题:" + subMsg.topic + " 消息:" + subMsg.msg);
|
}
|
@Override
|
public void notify(String devId, MqttNotifyInfo... infos) {
|
if(notify != null){
|
notify.notify(devId, infos) ;
|
}
|
}
|
}) ;
|
this.nextDeal(subMsg);
|
}
|
private void nextDeal(MqttSubMsg subMsg)throws Exception {
|
subMsg.action(new Callback() {
|
@Override
|
public void call(Object obj) {
|
MqttSubMsg subMs = (MqttSubMsg) obj ;
|
MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ;
|
if(pubMs != null){
|
//匹配到下行消息(命令)
|
subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ;
|
subMs.commandId = pubMs.commandId ;
|
try {
|
MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs));
|
} catch (Exception e) {
|
log.error("缓存发布消息(命令)结果发生异常", e);
|
}
|
}
|
try{
|
MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg));
|
}catch (Exception e){
|
log.error("缓存订阅消息数据发生异常", e);
|
}
|
}
|
@Override
|
public void call(Object... objs) {
|
}
|
@Override
|
public void exception(Exception e) {
|
}
|
});
|
}
|
}
|