package com.dy.rtuMw.server.msCenter; import com.alibaba.fastjson2.JSONObject; import com.dy.common.queue.Queue; import com.dy.common.springUtil.SpringContextUtil; import com.dy.common.threadPool.ThreadPool; import com.dy.common.threadPool.TreadPoolFactory; import com.dy.common.util.Callback; import com.dy.common.util.TimerTaskJob; import com.dy.rtuMw.web.webRequest.WebRequestDeal; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.*; /** * @Author: liurunyu * @Date: 2025/2/12 14:12 * @Description */ public class MsCenterManager extends TimerTaskJob implements Callback { private static final Logger log = LogManager.getLogger(MsCenterManager.class.getName()); private static final MsCenterManager INSTANCE = new MsCenterManager(); //消息缓存队列,线程安全的 private static final Queue msQueue = new Queue("MsCenterQueue") ; private static final Map msReceivers = new HashMap<>() ; private MsCenterManager(){ } protected static MsCenterManager getInstance() { return MsCenterManager.INSTANCE; } /** * 初始化配置信息 */ protected void initOption(MsCenterConfigVo configVo) { } /** * 注册消息接收器 * @param webUrl 接收者web http post url */ protected void registerMsReceiver(String webUrl){ if(!msReceivers.containsKey(webUrl)){ msReceivers.put(webUrl, webUrl) ; } } protected void pushMs(MsObj msNode){ try { msQueue.pushTail(msNode); }catch (Exception e){ log.error("消息中心队列存入消息时发生异常", e); } } @Override public Object execute() throws Exception { if(msQueue.size() > 0){ JSONObject msNode1 = (JSONObject)msQueue.pop() ; if(msNode1 != null){ //大部分时间msNode1是null List list = new ArrayList<>() ; list.add(msNode1) ; JSONObject msNode ; do{ msNode = (JSONObject)msQueue.pop() ; if(msNode != null){ list.add(msNode) ; } }while (msNode != null); this.notifyMs(list) ; } } return null; } //////////////////////////////////////////////////// // // 消息通知通知工作线程执行完成后回调的方法, // 也就是上面execute方法执行完成返回或抛出异常后,执行下面三个方法 // //////////////////////////////////////////////////// @Override public void call(Object obj) { //线程工作执行完了,obj = Boolean(true) } @Override public void call(Object... objs) { } @Override public void exception(Exception e) { log.error("消息通知伺服线程发生异常", e); } /** * 把消息通知出去 * @param list */ private void notifyMs(List list){ try { if(msReceivers.size() > 0){ ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolLong() ; pool.putJob(new ThreadPool.Job() { public void execute() { Iterator it = msReceivers.keySet().iterator() ; while (it.hasNext()){ doNotifyMs(it.next(), list) ; } } @Override public void destroy(){ } @Override public boolean isDestroy(){ return false ; } }); } } catch (Exception e) { log.error("在RtuDataNode内发生异常", e); } } private void doNotifyMs(String receiverWebUrl, List list){ WebRequestDeal deal = SpringContextUtil.getBean(WebRequestDeal.class) ; deal.deal(receiverWebUrl, list); } }