zhubaomin
2025-04-07 1a2b07f01ba4616fd9e894dddf474b56d020158c
pipIrr-platform/pipIrr-mw/pipIrr-mw-rtu/src/main/java/com/dy/rtuMw/server/msCenter/MsCenterManager.java
New file
@@ -0,0 +1,138 @@
package com.dy.rtuMw.server.msCenter;
import com.alibaba.fastjson2.JSONObject;
import com.dy.common.queue.Queue;
import com.dy.common.springUtil.SpringContextUtil;
import com.dy.common.threadPool.ThreadPool;
import com.dy.common.threadPool.TreadPoolFactory;
import com.dy.common.util.Callback;
import com.dy.common.util.TimerTaskJob;
import com.dy.rtuMw.web.webRequest.WebRequestDeal;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
/**
 * @Author: liurunyu
 * @Date: 2025/2/12 14:12
 * @Description
 */
public class MsCenterManager extends TimerTaskJob implements Callback {
    private static final Logger log = LogManager.getLogger(MsCenterManager.class.getName());
    private static final MsCenterManager INSTANCE = new MsCenterManager();
    //消息缓存队列,线程安全的
    private static final Queue msQueue = new Queue("MsCenterQueue") ;
    private static final Map<String, String> msReceivers = new HashMap<>() ;
    private MsCenterManager(){
    }
    protected static MsCenterManager getInstance() {
        return MsCenterManager.INSTANCE;
    }
    /**
     *  初始化配置信息
     */
    protected void initOption(MsCenterConfigVo configVo) {
    }
    /**
     * 注册消息接收器
     * @param webUrl 接收者web http post url
     */
    protected void registerMsReceiver(String webUrl){
        if(!msReceivers.containsKey(webUrl)){
            msReceivers.put(webUrl, webUrl) ;
        }
    }
    protected void pushMs(MsObj msNode){
        try {
            msQueue.pushTail(msNode);
        }catch (Exception e){
            log.error("消息中心队列存入消息时发生异常", e);
        }
    }
    @Override
    public Object execute() throws Exception {
        if(msQueue.size() > 0){
            JSONObject msNode1 = (JSONObject)msQueue.pop() ;
            if(msNode1 != null){
                //大部分时间msNode1是null
                List<JSONObject> list = new ArrayList<>() ;
                list.add(msNode1) ;
                JSONObject msNode ;
                do{
                    msNode = (JSONObject)msQueue.pop() ;
                    if(msNode != null){
                        list.add(msNode) ;
                    }
                }while (msNode != null);
                this.notifyMs(list) ;
            }
        }
        return null;
    }
    ////////////////////////////////////////////////////
    //
    // 消息通知通知工作线程执行完成后回调的方法,
    // 也就是上面execute方法执行完成返回或抛出异常后,执行下面三个方法
    //
    ////////////////////////////////////////////////////
    @Override
    public void call(Object obj) {
        //线程工作执行完了,obj = Boolean(true)
    }
    @Override
    public void call(Object... objs) {
    }
    @Override
    public void exception(Exception e) {
        log.error("消息通知伺服线程发生异常", e);
    }
    /**
     * 把消息通知出去
     * @param list
     */
    private void notifyMs(List<JSONObject> list){
        try {
            if(msReceivers.size() > 0){
                ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolLong() ;
                pool.putJob(new ThreadPool.Job() {
                    public void execute() {
                        Iterator<String> it = msReceivers.keySet().iterator() ;
                        while (it.hasNext()){
                            doNotifyMs(it.next(), list) ;
                        }
                    }
                    @Override
                    public void destroy(){
                    }
                    @Override
                    public boolean isDestroy(){
                        return false ;
                    }
                });
            }
        } catch (Exception e) {
            log.error("在RtuDataNode内发生异常", e);
        }
    }
    private void doNotifyMs(String receiverWebUrl, List<JSONObject> list){
        WebRequestDeal deal = SpringContextUtil.getBean(WebRequestDeal.class) ;
        deal.deal(receiverWebUrl, list);
    }
}