| | |
| | | |
| | | @Override |
| | | public void messageArrived(String topic, MqttMessage msg) throws Exception { |
| | | MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic) ; |
| | | MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback(){ |
| | | @Override |
| | | public void callback(MqttSubMsg subMsg) { |
| | | DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); |
| | | DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); |
| | | RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息 主题:" + subMsg.topic + " 消息:" + subMsg.msg); |
| | | } |
| | | @Override |
| | | public void notify(String devId, MqttNotifyInfo... infos) { |
| | | if(notify != null){ |
| | | notify.notify(devId, infos) ; |
| | | try { |
| | | MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic); |
| | | MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback() { |
| | | @Override |
| | | public void callback(MqttSubMsg subMsg) { |
| | | DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); |
| | | DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); |
| | | RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息 主题:" + subMsg.topic.longName() + " 元数据:" + subMsg.metaData); |
| | | } |
| | | } |
| | | }) ; |
| | | this.nextDeal(subMsg); |
| | | |
| | | @Override |
| | | public void notify(String devId, MqttNotifyInfo... infos) { |
| | | if (notify != null) { |
| | | notify.notify(devId, infos); |
| | | } |
| | | } |
| | | }); |
| | | this.nextDeal(subMsg); |
| | | }catch(Exception e){ |
| | | log.error("处理MQTT订阅消息发生异常", e); |
| | | } |
| | | } |
| | | private void nextDeal(MqttSubMsg subMsg)throws Exception { |
| | | subMsg.action(new Callback() { |