package com.dy.rtuMw.server.msCenter;
|
|
import com.alibaba.fastjson2.JSONObject;
|
import com.dy.common.queue.Queue;
|
import com.dy.common.springUtil.SpringContextUtil;
|
import com.dy.common.threadPool.ThreadPool;
|
import com.dy.common.threadPool.TreadPoolFactory;
|
import com.dy.common.util.Callback;
|
import com.dy.common.util.TimerTaskJob;
|
import com.dy.rtuMw.web.webRequest.WebRequestDeal;
|
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.Logger;
|
|
import java.util.*;
|
|
/**
|
* @Author: liurunyu
|
* @Date: 2025/2/12 14:12
|
* @Description
|
*/
|
public class MsCenterManager extends TimerTaskJob implements Callback {
|
|
private static final Logger log = LogManager.getLogger(MsCenterManager.class.getName());
|
|
private static final MsCenterManager INSTANCE = new MsCenterManager();
|
//消息缓存队列,线程安全的
|
private static final Queue msQueue = new Queue("MsCenterQueue") ;
|
|
private static final Map<String, String> msReceivers = new HashMap<>() ;
|
|
|
private MsCenterManager(){
|
}
|
|
protected static MsCenterManager getInstance() {
|
return MsCenterManager.INSTANCE;
|
}
|
|
/**
|
* 初始化配置信息
|
*/
|
protected void initOption(MsCenterConfigVo configVo) {
|
}
|
/**
|
* 注册消息接收器
|
* @param webUrl 接收者web http post url
|
*/
|
protected void registerMsReceiver(String webUrl){
|
if(!msReceivers.containsKey(webUrl)){
|
msReceivers.put(webUrl, webUrl) ;
|
}
|
}
|
|
protected void pushMs(MsObj msNode){
|
try {
|
msQueue.pushTail(msNode);
|
}catch (Exception e){
|
log.error("消息中心队列存入消息时发生异常", e);
|
}
|
}
|
|
@Override
|
public Object execute() throws Exception {
|
if(msQueue.size() > 0){
|
JSONObject msNode1 = (JSONObject)msQueue.pop() ;
|
if(msNode1 != null){
|
//大部分时间msNode1是null
|
List<JSONObject> list = new ArrayList<>() ;
|
list.add(msNode1) ;
|
JSONObject msNode ;
|
do{
|
msNode = (JSONObject)msQueue.pop() ;
|
if(msNode != null){
|
list.add(msNode) ;
|
}
|
}while (msNode != null);
|
this.notifyMs(list) ;
|
}
|
}
|
return null;
|
}
|
|
|
////////////////////////////////////////////////////
|
//
|
// 消息通知通知工作线程执行完成后回调的方法,
|
// 也就是上面execute方法执行完成返回或抛出异常后,执行下面三个方法
|
//
|
////////////////////////////////////////////////////
|
@Override
|
public void call(Object obj) {
|
//线程工作执行完了,obj = Boolean(true)
|
}
|
@Override
|
public void call(Object... objs) {
|
}
|
@Override
|
public void exception(Exception e) {
|
log.error("消息通知伺服线程发生异常", e);
|
}
|
|
|
/**
|
* 把消息通知出去
|
* @param list
|
*/
|
private void notifyMs(List<JSONObject> list){
|
try {
|
if(msReceivers.size() > 0){
|
ThreadPool.Pool pool = TreadPoolFactory.getThreadPoolLong() ;
|
pool.putJob(new ThreadPool.Job() {
|
public void execute() {
|
Iterator<String> it = msReceivers.keySet().iterator() ;
|
while (it.hasNext()){
|
doNotifyMs(it.next(), list) ;
|
}
|
}
|
@Override
|
public void destroy(){
|
}
|
@Override
|
public boolean isDestroy(){
|
return false ;
|
}
|
|
});
|
}
|
} catch (Exception e) {
|
log.error("在RtuDataNode内发生异常", e);
|
}
|
}
|
|
private void doNotifyMs(String receiverWebUrl, List<JSONObject> list){
|
WebRequestDeal deal = SpringContextUtil.getBean(WebRequestDeal.class) ;
|
deal.deal(receiverWebUrl, list);
|
}
|
|
}
|