zhubaomin
2025-04-07 1a2b07f01ba4616fd9e894dddf474b56d020158c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package com.dy.rtuMw.server.msCenter;
 
import com.alibaba.fastjson2.JSONObject;
import com.dy.common.queue.Queue;
import com.dy.common.springUtil.SpringContextUtil;
import com.dy.common.threadPool.ThreadPool;
import com.dy.common.threadPool.TreadPoolFactory;
import com.dy.common.util.Callback;
import com.dy.common.util.TimerTaskJob;
import com.dy.rtuMw.web.webRequest.WebRequestDeal;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
 
import java.util.*;
 
/**
 * @Author: liurunyu
 * @Date: 2025/2/12 14:12
 * @Description
 */
public class MsCenterManager extends TimerTaskJob implements Callback {
 
    private static final Logger log = LogManager.getLogger(MsCenterManager.class.getName());
 
    private static final MsCenterManager INSTANCE = new MsCenterManager();
    //消息缓存队列,线程安全的
    private static final Queue msQueue = new Queue("MsCenterQueue") ;
 
    private static final Map<String, String> msReceivers = new HashMap<>() ;
 
 
    private MsCenterManager(){
    }
 
    protected static MsCenterManager getInstance() {
        return MsCenterManager.INSTANCE;
    }
 
    /**
     *  初始化配置信息
     */
    protected void initOption(MsCenterConfigVo configVo) {
    }
    /**
     * 注册消息接收器
     * @param webUrl 接收者web http post url
     */
    protected void registerMsReceiver(String webUrl){
        if(!msReceivers.containsKey(webUrl)){
            msReceivers.put(webUrl, webUrl) ;
        }
    }
 
    protected void pushMs(MsObj msNode){
        try {
            msQueue.pushTail(msNode);
        }catch (Exception e){
            log.error("消息中心队列存入消息时发生异常", e);
        }
    }
 
    @Override
    public Object execute() throws Exception {
        if(msQueue.size() > 0){
            JSONObject msNode1 = (JSONObject)msQueue.pop() ;
            if(msNode1 != null){
                //大部分时间msNode1是null
                List<JSONObject> list = new ArrayList<>() ;
                list.add(msNode1) ;
                JSONObject msNode ;
                do{
                    msNode = (JSONObject)msQueue.pop() ;
                    if(msNode != null){
                        list.add(msNode) ;
                    }
                }while (msNode != null);
                this.notifyMs(list) ;
            }
        }
        return null;
    }
 
 
    ////////////////////////////////////////////////////
    //
    // 消息通知通知工作线程执行完成后回调的方法,
    // 也就是上面execute方法执行完成返回或抛出异常后,执行下面三个方法
    //
    ////////////////////////////////////////////////////
    @Override
    public void call(Object obj) {
        //线程工作执行完了,obj = Boolean(true)
    }
    @Override
    public void call(Object... objs) {
    }
    @Override
    public void exception(Exception e) {
        log.error("消息通知伺服线程发生异常", e);
    }
 
 
    /**
     * 把消息通知出去
     * @param list
     */
    private void notifyMs(List<JSONObject> list){
        try {
            if(msReceivers.size() > 0){
                ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolLong() ;
                pool.putJob(new ThreadPool.Job() {
                    public void execute() {
                        Iterator<String> it = msReceivers.keySet().iterator() ;
                        while (it.hasNext()){
                            doNotifyMs(it.next(), list) ;
                        }
                    }
                    @Override
                    public void destroy(){
                    }
                    @Override
                    public boolean isDestroy(){
                        return false ;
                    }
 
                });
            }
        } catch (Exception e) {
            log.error("在RtuDataNode内发生异常", e);
        }
    }
 
    private void doNotifyMs(String receiverWebUrl, List<JSONObject> list){
        WebRequestDeal deal = SpringContextUtil.getBean(WebRequestDeal.class) ;
        deal.deal(receiverWebUrl, list);
    }
 
}