| New file | 
|  |  |  | 
|---|
|  |  |  | package com.dy.rtuMw.server.msCenter; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import com.alibaba.fastjson2.JSONObject; | 
|---|
|  |  |  | import com.dy.common.queue.Queue; | 
|---|
|  |  |  | import com.dy.common.springUtil.SpringContextUtil; | 
|---|
|  |  |  | import com.dy.common.threadPool.ThreadPool; | 
|---|
|  |  |  | import com.dy.common.threadPool.TreadPoolFactory; | 
|---|
|  |  |  | import com.dy.common.util.Callback; | 
|---|
|  |  |  | import com.dy.common.util.TimerTaskJob; | 
|---|
|  |  |  | import com.dy.rtuMw.web.webRequest.WebRequestDeal; | 
|---|
|  |  |  | import org.apache.logging.log4j.LogManager; | 
|---|
|  |  |  | import org.apache.logging.log4j.Logger; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import java.util.*; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * @Author: liurunyu | 
|---|
|  |  |  | * @Date: 2025/2/12 14:12 | 
|---|
|  |  |  | * @Description | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public class MsCenterManager extends TimerTaskJob implements Callback { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private static final Logger log = LogManager.getLogger(MsCenterManager.class.getName()); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private static final MsCenterManager INSTANCE = new MsCenterManager(); | 
|---|
|  |  |  | //消息缓存队列,线程安全的 | 
|---|
|  |  |  | private static final Queue msQueue = new Queue("MsCenterQueue") ; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private static final Map<String, String> msReceivers = new HashMap<>() ; | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private MsCenterManager(){ | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | protected static MsCenterManager getInstance() { | 
|---|
|  |  |  | return MsCenterManager.INSTANCE; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | *  初始化配置信息 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | protected void initOption(MsCenterConfigVo configVo) { | 
|---|
|  |  |  | } | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 注册消息接收器 | 
|---|
|  |  |  | * @param webUrl 接收者web http post url | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | protected void registerMsReceiver(String webUrl){ | 
|---|
|  |  |  | if(!msReceivers.containsKey(webUrl)){ | 
|---|
|  |  |  | msReceivers.put(webUrl, webUrl) ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | protected void pushMs(MsObj msNode){ | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | msQueue.pushTail(msNode); | 
|---|
|  |  |  | }catch (Exception e){ | 
|---|
|  |  |  | log.error("消息中心队列存入消息时发生异常", e); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public Object execute() throws Exception { | 
|---|
|  |  |  | if(msQueue.size() > 0){ | 
|---|
|  |  |  | JSONObject msNode1 = (JSONObject)msQueue.pop() ; | 
|---|
|  |  |  | if(msNode1 != null){ | 
|---|
|  |  |  | //大部分时间msNode1是null | 
|---|
|  |  |  | List<JSONObject> list = new ArrayList<>() ; | 
|---|
|  |  |  | list.add(msNode1) ; | 
|---|
|  |  |  | JSONObject msNode ; | 
|---|
|  |  |  | do{ | 
|---|
|  |  |  | msNode = (JSONObject)msQueue.pop() ; | 
|---|
|  |  |  | if(msNode != null){ | 
|---|
|  |  |  | list.add(msNode) ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | }while (msNode != null); | 
|---|
|  |  |  | this.notifyMs(list) ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | return null; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //////////////////////////////////////////////////// | 
|---|
|  |  |  | // | 
|---|
|  |  |  | // 消息通知通知工作线程执行完成后回调的方法, | 
|---|
|  |  |  | // 也就是上面execute方法执行完成返回或抛出异常后,执行下面三个方法 | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //////////////////////////////////////////////////// | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void call(Object obj) { | 
|---|
|  |  |  | //线程工作执行完了,obj = Boolean(true) | 
|---|
|  |  |  | } | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void call(Object... objs) { | 
|---|
|  |  |  | } | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void exception(Exception e) { | 
|---|
|  |  |  | log.error("消息通知伺服线程发生异常", e); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 把消息通知出去 | 
|---|
|  |  |  | * @param list | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | private void notifyMs(List<JSONObject> list){ | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | if(msReceivers.size() > 0){ | 
|---|
|  |  |  | ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolLong() ; | 
|---|
|  |  |  | pool.putJob(new ThreadPool.Job() { | 
|---|
|  |  |  | public void execute() { | 
|---|
|  |  |  | Iterator<String> it = msReceivers.keySet().iterator() ; | 
|---|
|  |  |  | while (it.hasNext()){ | 
|---|
|  |  |  | doNotifyMs(it.next(), list) ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void destroy(){ | 
|---|
|  |  |  | } | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public boolean isDestroy(){ | 
|---|
|  |  |  | return false ; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | }); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } catch (Exception e) { | 
|---|
|  |  |  | log.error("在RtuDataNode内发生异常", e); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private void doNotifyMs(String receiverWebUrl, List<JSONObject> list){ | 
|---|
|  |  |  | WebRequestDeal deal = SpringContextUtil.getBean(WebRequestDeal.class) ; | 
|---|
|  |  |  | deal.deal(receiverWebUrl, list); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | } | 
|---|