package com.dy.rtuMw.server.mqtt;
|
|
import com.dy.common.mw.protocol4Mqtt.*;
|
import com.dy.common.mw.protocol4Mqtt.MqttCallback;
|
import com.dy.common.mw.protocol4Mqtt.MqttTopic;
|
import com.dy.common.util.Callback;
|
import com.dy.rtuMw.server.forTcp.RtuLogDealer;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.* ;
|
|
/**
|
* @Author: liurunyu
|
* @Date: 2025/6/4 15:52
|
* @Description
|
*/
|
@Slf4j
|
public class MqttMessageListener implements IMqttMessageListener{
|
private MqttNotify notify ;
|
public MqttMessageListener(MqttNotify notify){
|
this.notify = notify ;
|
}
|
|
@Override
|
public void messageArrived(String topic, MqttMessage msg) throws Exception {
|
try {
|
MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic);
|
MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback() {
|
@Override
|
public void callback(MqttSubMsg subMsg) {
|
DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol);
|
DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId);
|
RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息 主题:" + subMsg.topic.longName() + " 元数据:" + subMsg.metaData);
|
}
|
|
@Override
|
public void notify(String devId, MqttNotifyInfo... infos) {
|
if (notify != null) {
|
notify.notify(devId, infos);
|
}
|
}
|
});
|
this.nextDeal(subMsg);
|
}catch(Exception e){
|
log.error("处理MQTT订阅消息发生异常", e);
|
}
|
}
|
private void nextDeal(MqttSubMsg subMsg)throws Exception {
|
subMsg.action(new MqttSubMsgDealer());
|
}
|
}
|