|  |  | 
 |  |  |  | 
 |  |  |     @Override | 
 |  |  |     public void messageArrived(String topic, MqttMessage msg) throws Exception { | 
 |  |  |         MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic) ; | 
 |  |  |         MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback(){ | 
 |  |  |             @Override | 
 |  |  |             public void callback(MqttSubMsg subMsg) { | 
 |  |  |                 DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); | 
 |  |  |                 DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); | 
 |  |  |                 RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息    主题:" + subMsg.topic + "   消息:" + subMsg.msg); | 
 |  |  |             } | 
 |  |  |             @Override | 
 |  |  |             public void notify(String devId, MqttNotifyInfo... infos) { | 
 |  |  |                 if(notify != null){ | 
 |  |  |                     notify.notify(devId, infos) ; | 
 |  |  |         try { | 
 |  |  |             MqttTopic subTopic = MqttMsgParser.parseSubTopic(topic); | 
 |  |  |             MqttSubMsg subMsg = MqttMsgParser.parseSubMsg(subTopic, msg, new MqttCallback() { | 
 |  |  |                 @Override | 
 |  |  |                 public void callback(MqttSubMsg subMsg) { | 
 |  |  |                     DevStatusDealer.onLine(subMsg.deviceId, subMsg.protocol); | 
 |  |  |                     DevStatusDealer.afterReceiveSubMessage(subMsg.deviceId); | 
 |  |  |                     RtuLogDealer.log4Mqtt(subMsg.deviceId, "订阅消息    主题:" + subMsg.topic.longName() + "   原数据:" + subMsg.metaData); | 
 |  |  |                 } | 
 |  |  |             } | 
 |  |  |         }) ; | 
 |  |  |         this.nextDeal(subMsg); | 
 |  |  |     } | 
 |  |  |     private void nextDeal(MqttSubMsg subMsg)throws Exception { | 
 |  |  |         subMsg.action(new Callback() { | 
 |  |  |             @Override | 
 |  |  |             public void call(Object obj) { | 
 |  |  |                 MqttSubMsg subMs = (MqttSubMsg) obj ; | 
 |  |  |                 MqttPubMsg pubMs = MqttPubMsgCache.matchFromTail(subMs) ; | 
 |  |  |                 if(pubMs != null){ | 
 |  |  |                     //匹配到下行消息(命令) | 
 |  |  |                     subMs.mqttResultSendWebUrl = pubMs.mqttResultSendWebUrl ; | 
 |  |  |                     subMs.commandId = pubMs.commandId ; | 
 |  |  |                     try { | 
 |  |  |                         MqttComResultCache.getInstance().cacheMqttComResult(new MqttComResultNode(subMs)); | 
 |  |  |                     } catch (Exception e) { | 
 |  |  |                         log.error("缓存发布消息(命令)结果发生异常", e); | 
 |  |  |  | 
 |  |  |                 @Override | 
 |  |  |                 public void notify(String devId, MqttNotifyInfo... infos) { | 
 |  |  |                     if (notify != null) { | 
 |  |  |                         notify.notify(devId, infos); | 
 |  |  |                     } | 
 |  |  |                 } | 
 |  |  |                 try{ | 
 |  |  |                     MqttSubMsgCache.getInstance().cacheMsg(new MqttSubMsgNode(subMsg)); | 
 |  |  |                 }catch (Exception e){ | 
 |  |  |                     log.error("缓存订阅消息数据发生异常", e); | 
 |  |  |                 } | 
 |  |  |             } | 
 |  |  |             @Override | 
 |  |  |             public void call(Object... objs) { | 
 |  |  |             } | 
 |  |  |             @Override | 
 |  |  |             public void exception(Exception e) { | 
 |  |  |             } | 
 |  |  |         }); | 
 |  |  |             }); | 
 |  |  |             this.nextDeal(subMsg); | 
 |  |  |         }catch(Exception e){ | 
 |  |  |             log.error("处理MQTT订阅消息发生异常", e); | 
 |  |  |         } | 
 |  |  |     } | 
 |  |  |     private void nextDeal(MqttSubMsg subMsg)throws Exception { | 
 |  |  |         subMsg.action(new MqttSubMsgDealer()); | 
 |  |  |     } | 
 |  |  | } |