package com.dy.rtuMw.server.msCenter; 
 | 
  
 | 
import com.alibaba.fastjson2.JSONObject; 
 | 
import com.dy.common.queue.Queue; 
 | 
import com.dy.common.springUtil.SpringContextUtil; 
 | 
import com.dy.common.threadPool.ThreadPool; 
 | 
import com.dy.common.threadPool.TreadPoolFactory; 
 | 
import com.dy.common.util.Callback; 
 | 
import com.dy.common.util.TimerTaskJob; 
 | 
import com.dy.rtuMw.web.webRequest.WebRequestDeal; 
 | 
import org.apache.logging.log4j.LogManager; 
 | 
import org.apache.logging.log4j.Logger; 
 | 
  
 | 
import java.util.*; 
 | 
  
 | 
/** 
 | 
 * @Author: liurunyu 
 | 
 * @Date: 2025/2/12 14:12 
 | 
 * @Description 
 | 
 */ 
 | 
public class MsCenterManager extends TimerTaskJob implements Callback { 
 | 
  
 | 
    private static final Logger log = LogManager.getLogger(MsCenterManager.class.getName()); 
 | 
  
 | 
    private static final MsCenterManager INSTANCE = new MsCenterManager(); 
 | 
    //消息缓存队列,线程安全的 
 | 
    private static final Queue msQueue = new Queue("MsCenterQueue") ; 
 | 
  
 | 
    private static final Map<String, String> msReceivers = new HashMap<>() ; 
 | 
  
 | 
  
 | 
    private MsCenterManager(){ 
 | 
    } 
 | 
  
 | 
    protected static MsCenterManager getInstance() { 
 | 
        return MsCenterManager.INSTANCE; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     *  初始化配置信息 
 | 
     */ 
 | 
    protected void initOption(MsCenterConfigVo configVo) { 
 | 
    } 
 | 
    /** 
 | 
     * 注册消息接收器 
 | 
     * @param webUrl 接收者web http post url 
 | 
     */ 
 | 
    protected void registerMsReceiver(String webUrl){ 
 | 
        if(!msReceivers.containsKey(webUrl)){ 
 | 
            msReceivers.put(webUrl, webUrl) ; 
 | 
        } 
 | 
    } 
 | 
  
 | 
    protected void pushMs(MsObj msNode){ 
 | 
        try { 
 | 
            msQueue.pushTail(msNode); 
 | 
        }catch (Exception e){ 
 | 
            log.error("消息中心队列存入消息时发生异常", e); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public Object execute() throws Exception { 
 | 
        if(msQueue.size() > 0){ 
 | 
            JSONObject msNode1 = (JSONObject)msQueue.pop() ; 
 | 
            if(msNode1 != null){ 
 | 
                //大部分时间msNode1是null 
 | 
                List<JSONObject> list = new ArrayList<>() ; 
 | 
                list.add(msNode1) ; 
 | 
                JSONObject msNode ; 
 | 
                do{ 
 | 
                    msNode = (JSONObject)msQueue.pop() ; 
 | 
                    if(msNode != null){ 
 | 
                        list.add(msNode) ; 
 | 
                    } 
 | 
                }while (msNode != null); 
 | 
                this.notifyMs(list) ; 
 | 
            } 
 | 
        } 
 | 
        return null; 
 | 
    } 
 | 
  
 | 
  
 | 
    //////////////////////////////////////////////////// 
 | 
    // 
 | 
    // 消息通知通知工作线程执行完成后回调的方法, 
 | 
    // 也就是上面execute方法执行完成返回或抛出异常后,执行下面三个方法 
 | 
    // 
 | 
    //////////////////////////////////////////////////// 
 | 
    @Override 
 | 
    public void call(Object obj) { 
 | 
        //线程工作执行完了,obj = Boolean(true) 
 | 
    } 
 | 
    @Override 
 | 
    public void call(Object... objs) { 
 | 
    } 
 | 
    @Override 
 | 
    public void exception(Exception e) { 
 | 
        log.error("消息通知伺服线程发生异常", e); 
 | 
    } 
 | 
  
 | 
  
 | 
    /** 
 | 
     * 把消息通知出去 
 | 
     * @param list 
 | 
     */ 
 | 
    private void notifyMs(List<JSONObject> list){ 
 | 
        try { 
 | 
            if(msReceivers.size() > 0){ 
 | 
                ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolLong() ; 
 | 
                pool.putJob(new ThreadPool.Job() { 
 | 
                    public void execute() { 
 | 
                        Iterator<String> it = msReceivers.keySet().iterator() ; 
 | 
                        while (it.hasNext()){ 
 | 
                            doNotifyMs(it.next(), list) ; 
 | 
                        } 
 | 
                    } 
 | 
                    @Override 
 | 
                    public void destroy(){ 
 | 
                    } 
 | 
                    @Override 
 | 
                    public boolean isDestroy(){ 
 | 
                        return false ; 
 | 
                    } 
 | 
  
 | 
                }); 
 | 
            } 
 | 
        } catch (Exception e) { 
 | 
            log.error("在RtuDataNode内发生异常", e); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    private void doNotifyMs(String receiverWebUrl, List<JSONObject> list){ 
 | 
        WebRequestDeal deal = SpringContextUtil.getBean(WebRequestDeal.class) ; 
 | 
        deal.deal(receiverWebUrl, list); 
 | 
    } 
 | 
  
 | 
} 
 |