package com.dy.rtuMw.server.mqtt; import com.dy.common.mw.protocol4Mqtt.MqttSubMsg; import com.dy.common.queue.NodeObj; import com.dy.common.threadPool.ThreadPool; import com.dy.common.threadPool.TreadPoolFactory; import com.dy.rtuMw.server.rtuData.TaskPool; import com.dy.rtuMw.server.rtuData.TaskSurpport; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; /** * @Author: liurunyu * @Date: 2025/6/4 16:14 * @Description */ public class MqttSubMsgNode implements NodeObj { private static Logger log = LogManager.getLogger(MqttSubMsgNode.class.getName()); protected MqttSubMsg msg; public MqttSubMsgNode(MqttSubMsg obj){ this.msg = obj ; } /** * 自己处理自己 * @return */ public boolean dealSelf(){ try { ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolLong() ; pool.putJob(new ThreadPool.Job() { public void execute() { if(msg != null && msg.valid()){ TaskSurpport task = null ; try{ task = TaskPool.popTask() ; if(task != null){ task.execute(msg); }else{ log.error("未得到Mq订阅消息处理任务!"); } }catch(Exception e){ log.error("Mq订阅消息任务池处理数据时发生异常", e); }finally { if(task != null){ TaskPool.freeAndCleanTask(task); } } } } @Override public void destroy(){ } @Override public boolean isDestroy(){ return false ; } }); } catch (Exception e) { log.error("在MqttSubMsgObj内发生异常", e); } return true ; } }