New file |
| | |
| | | package com.dy.rtuMw.server.msCenter; |
| | | |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.dy.common.queue.Queue; |
| | | import com.dy.common.springUtil.SpringContextUtil; |
| | | import com.dy.common.threadPool.ThreadPool; |
| | | import com.dy.common.threadPool.TreadPoolFactory; |
| | | import com.dy.common.util.Callback; |
| | | import com.dy.common.util.TimerTaskJob; |
| | | import com.dy.rtuMw.web.webRequest.WebRequestDeal; |
| | | import org.apache.logging.log4j.LogManager; |
| | | import org.apache.logging.log4j.Logger; |
| | | |
| | | import java.util.*; |
| | | |
| | | /** |
| | | * @Author: liurunyu |
| | | * @Date: 2025/2/12 14:12 |
| | | * @Description |
| | | */ |
| | | public class MsCenterManager extends TimerTaskJob implements Callback { |
| | | |
| | | private static final Logger log = LogManager.getLogger(MsCenterManager.class.getName()); |
| | | |
| | | private static final MsCenterManager INSTANCE = new MsCenterManager(); |
| | | //消息缓存队列,线程安全的 |
| | | private static final Queue msQueue = new Queue("MsCenterQueue") ; |
| | | |
| | | private static final Map<String, String> msReceivers = new HashMap<>() ; |
| | | |
| | | |
| | | private MsCenterManager(){ |
| | | } |
| | | |
| | | protected static MsCenterManager getInstance() { |
| | | return MsCenterManager.INSTANCE; |
| | | } |
| | | |
| | | /** |
| | | * 初始化配置信息 |
| | | */ |
| | | protected void initOption(MsCenterConfigVo configVo) { |
| | | } |
| | | /** |
| | | * 注册消息接收器 |
| | | * @param webUrl 接收者web http post url |
| | | */ |
| | | protected void registerMsReceiver(String webUrl){ |
| | | if(!msReceivers.containsKey(webUrl)){ |
| | | msReceivers.put(webUrl, webUrl) ; |
| | | } |
| | | } |
| | | |
| | | protected void pushMs(MsObj msNode){ |
| | | try { |
| | | msQueue.pushTail(msNode); |
| | | }catch (Exception e){ |
| | | log.error("消息中心队列存入消息时发生异常", e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public Object execute() throws Exception { |
| | | if(msQueue.size() > 0){ |
| | | JSONObject msNode1 = (JSONObject)msQueue.pop() ; |
| | | if(msNode1 != null){ |
| | | //大部分时间msNode1是null |
| | | List<JSONObject> list = new ArrayList<>() ; |
| | | list.add(msNode1) ; |
| | | JSONObject msNode ; |
| | | do{ |
| | | msNode = (JSONObject)msQueue.pop() ; |
| | | if(msNode != null){ |
| | | list.add(msNode) ; |
| | | } |
| | | }while (msNode != null); |
| | | this.notifyMs(list) ; |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | |
| | | //////////////////////////////////////////////////// |
| | | // |
| | | // 消息通知通知工作线程执行完成后回调的方法, |
| | | // 也就是上面execute方法执行完成返回或抛出异常后,执行下面三个方法 |
| | | // |
| | | //////////////////////////////////////////////////// |
| | | @Override |
| | | public void call(Object obj) { |
| | | //线程工作执行完了,obj = Boolean(true) |
| | | } |
| | | @Override |
| | | public void call(Object... objs) { |
| | | } |
| | | @Override |
| | | public void exception(Exception e) { |
| | | log.error("消息通知伺服线程发生异常", e); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 把消息通知出去 |
| | | * @param list |
| | | */ |
| | | private void notifyMs(List<JSONObject> list){ |
| | | try { |
| | | if(msReceivers.size() > 0){ |
| | | ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolLong() ; |
| | | pool.putJob(new ThreadPool.Job() { |
| | | public void execute() { |
| | | Iterator<String> it = msReceivers.keySet().iterator() ; |
| | | while (it.hasNext()){ |
| | | doNotifyMs(it.next(), list) ; |
| | | } |
| | | } |
| | | @Override |
| | | public void destroy(){ |
| | | } |
| | | @Override |
| | | public boolean isDestroy(){ |
| | | return false ; |
| | | } |
| | | |
| | | }); |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("在RtuDataNode内发生异常", e); |
| | | } |
| | | } |
| | | |
| | | private void doNotifyMs(String receiverWebUrl, List<JSONObject> list){ |
| | | WebRequestDeal deal = SpringContextUtil.getBean(WebRequestDeal.class) ; |
| | | deal.deal(receiverWebUrl, list); |
| | | } |
| | | |
| | | } |