package com.easysocket.connection.dispatcher;
|
|
import com.easysocket.callback.SuperCallBack;
|
import com.easysocket.config.EasySocketOptions;
|
import com.easysocket.entity.OriginReadData;
|
import com.easysocket.entity.SocketAddress;
|
import com.easysocket.entity.basemsg.SuperCallbackSender;
|
import com.easysocket.exception.RequestTimeOutException;
|
import com.easysocket.interfaces.conn.IConnectionManager;
|
import com.easysocket.interfaces.conn.SocketActionListener;
|
import com.easysocket.utils.LogUtil;
|
import com.easysocket.utils.Utils;
|
|
import java.util.HashMap;
|
import java.util.Map;
|
import java.util.concurrent.DelayQueue;
|
import java.util.concurrent.Delayed;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.TimeUnit;
|
|
/**
|
* Author:Alex
|
* Date:2019/6/4
|
* Note:回调消息分发器
|
*/
|
public class CallbackResponseDispatcher {
|
/**
|
* 保存发送的每个回调消息的监听实例,key为回调标识callbackId,这样回调消息有反馈的时候,就可以找到并调用
|
* 对应的监听对象
|
*/
|
private Map<String, SuperCallBack> callbacks = new HashMap<>();
|
/**
|
* 保存需要进行超时检测的请求,这是一个延时队列,元素超时的时候会被取出来
|
*/
|
private DelayQueue<timeoutItem> timeoutQueue = new DelayQueue<>();
|
/**
|
* 超时检测的线程管理器
|
*/
|
private ExecutorService timeoutExecutor;
|
|
/**
|
* 连接管理
|
*/
|
IConnectionManager connectionManager;
|
|
private EasySocketOptions socketOptions;
|
|
|
public CallbackResponseDispatcher(IConnectionManager connectionManager) {
|
this.connectionManager = connectionManager;
|
socketOptions = connectionManager.getOptions();
|
// 注册监听
|
connectionManager.subscribeSocketAction(socketActionListener);
|
// 开始超时检测线程
|
engineThread();
|
}
|
|
/**
|
* 设置socketoptions
|
*
|
* @param socketOptions
|
*/
|
public void setSocketOptions(EasySocketOptions socketOptions) {
|
this.socketOptions = socketOptions;
|
}
|
|
/**
|
* 超时检测线程
|
*/
|
public void engineThread() {
|
try {
|
if (timeoutExecutor == null || timeoutExecutor.isShutdown()) {
|
timeoutExecutor = Executors.newSingleThreadExecutor();
|
timeoutExecutor.execute(new Runnable() {
|
@Override
|
public void run() {
|
try {
|
// 只有超时的元素才会被取出,没有的话会被等待
|
timeoutItem item = timeoutQueue.take();
|
if (item != null) {
|
SuperCallBack callBack = callbacks.remove(item.callbackId);
|
if (callBack != null)
|
callBack.onError(new RequestTimeOutException("request timeout"));
|
}
|
} catch (InterruptedException e) {
|
e.printStackTrace();
|
}
|
// 继续循环
|
if (timeoutExecutor != null && !timeoutExecutor.isShutdown()) {
|
run();
|
}
|
}
|
});
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 关闭线程
|
*/
|
public void shutdownThread() {
|
if (timeoutExecutor != null && !timeoutExecutor.isShutdown()) {
|
// shutdown和shutdownNow的主要区别是前者中断未执行的线程,后者中断所有线程
|
timeoutExecutor.shutdownNow();
|
timeoutExecutor = null;
|
}
|
|
}
|
|
/**
|
* socket行为监听,重写反馈消息的回调方法
|
*/
|
private SocketActionListener socketActionListener = new SocketActionListener() {
|
@Override
|
public void onSocketResponse(SocketAddress socketAddress, OriginReadData originReadData) {
|
if (callbacks.size() == 0) return;
|
if (socketOptions.getCallbackIDFactory() == null) return;
|
// 获取回调ID
|
String callbackID = socketOptions.getCallbackIDFactory().getCallbackID(originReadData);
|
if (callbackID != null) {
|
// 获取callbackID对应的callback
|
SuperCallBack callBack = callbacks.get(callbackID);
|
if (callBack != null) {
|
// 回调
|
callBack.onSuccess(originReadData);
|
callbacks.remove(callbackID); // 移除完成任务的callback
|
LogUtil.d("移除的callbackId-->" + callbackID);
|
}
|
}
|
}
|
};
|
|
|
/**
|
* 每发一条回调消息都要在这里添加监听对象
|
*
|
* @param superCallBack
|
*/
|
public void addSocketCallback(SuperCallBack superCallBack) {
|
callbacks.put(superCallBack.getCallbackId(), superCallBack);
|
// 放入延时队列
|
long delayTime = socketOptions == null ?
|
EasySocketOptions.getDefaultOptions().getRequestTimeout() : socketOptions.getRequestTimeout();
|
timeoutQueue.add(new timeoutItem(superCallBack.getCallbackId(), delayTime, TimeUnit.MILLISECONDS));
|
}
|
|
/**
|
* 延时队列的item
|
*/
|
class timeoutItem implements Delayed {
|
|
String callbackId; // 当前callback的callbackId
|
long executeTime; // 触发时间
|
|
public timeoutItem(String callbackId, long delayTime, TimeUnit timeUnit) {
|
this.callbackId = callbackId;
|
this.executeTime = System.currentTimeMillis() + (delayTime > 0 ? timeUnit.toMillis(delayTime) : 0);
|
}
|
|
@Override
|
public long getDelay(TimeUnit unit) {
|
return executeTime - System.currentTimeMillis();
|
}
|
|
@Override
|
public int compareTo(Delayed o) {
|
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
|
}
|
}
|
|
/**
|
* 同一个消息发送多次,callbackId是不能一样的,所以这里要先check一下,否则服务端反馈的时候,客户端接收就会乱套
|
*
|
* @param callbackSender
|
* @return
|
*/
|
public void checkCallbackSender(SuperCallbackSender callbackSender) {
|
|
Utils.checkNotNull(socketOptions.getCallbackIDFactory(), "要想实现EasySocket的回调功能,CallbackIdFactory不能为null," +
|
"请实现一个CallbackIdFactory并在初始化的时候通过EasySocketOptions的setCallbackIdFactory进行配置");
|
String callbackId = callbackSender.getCallbackId();
|
// 同一个消息发送两次以上,callbackId是不能一样的,否则服务端反馈的时候,客户端接收就会乱套
|
if (callbacks.containsKey(callbackId)) {
|
callbackSender.generateCallbackId();
|
}
|
}
|
|
}
|