package com.easysocket.connection.dispatcher; import com.easysocket.callback.SuperCallBack; import com.easysocket.config.EasySocketOptions; import com.easysocket.entity.OriginReadData; import com.easysocket.entity.SocketAddress; import com.easysocket.entity.basemsg.SuperCallbackSender; import com.easysocket.exception.RequestTimeOutException; import com.easysocket.interfaces.conn.IConnectionManager; import com.easysocket.interfaces.conn.SocketActionListener; import com.easysocket.utils.LogUtil; import com.easysocket.utils.Utils; import java.util.HashMap; import java.util.Map; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * Author:Alex * Date:2019/6/4 * Note:回调消息分发器 */ public class CallbackResponseDispatcher { /** * 保存发送的每个回调消息的监听实例,key为回调标识callbackId,这样回调消息有反馈的时候,就可以找到并调用 * 对应的监听对象 */ private Map callbacks = new HashMap<>(); /** * 保存需要进行超时检测的请求,这是一个延时队列,元素超时的时候会被取出来 */ private DelayQueue timeoutQueue = new DelayQueue<>(); /** * 超时检测的线程管理器 */ private ExecutorService timeoutExecutor; /** * 连接管理 */ IConnectionManager connectionManager; private EasySocketOptions socketOptions; public CallbackResponseDispatcher(IConnectionManager connectionManager) { this.connectionManager = connectionManager; socketOptions = connectionManager.getOptions(); // 注册监听 connectionManager.subscribeSocketAction(socketActionListener); // 开始超时检测线程 engineThread(); } /** * 设置socketoptions * * @param socketOptions */ public void setSocketOptions(EasySocketOptions socketOptions) { this.socketOptions = socketOptions; } /** * 超时检测线程 */ public void engineThread() { try { if (timeoutExecutor == null || timeoutExecutor.isShutdown()) { timeoutExecutor = Executors.newSingleThreadExecutor(); timeoutExecutor.execute(new Runnable() { @Override public void run() { try { // 只有超时的元素才会被取出,没有的话会被等待 timeoutItem item = timeoutQueue.take(); if (item != null) { SuperCallBack callBack = callbacks.remove(item.callbackId); if (callBack != null) callBack.onError(new RequestTimeOutException("request timeout")); } } catch (InterruptedException e) { e.printStackTrace(); } // 继续循环 if (timeoutExecutor != null && !timeoutExecutor.isShutdown()) { run(); } } }); } } catch (Exception e) { e.printStackTrace(); } } /** * 关闭线程 */ public void shutdownThread() { if (timeoutExecutor != null && !timeoutExecutor.isShutdown()) { // shutdown和shutdownNow的主要区别是前者中断未执行的线程,后者中断所有线程 timeoutExecutor.shutdownNow(); timeoutExecutor = null; } } /** * socket行为监听,重写反馈消息的回调方法 */ private SocketActionListener socketActionListener = new SocketActionListener() { @Override public void onSocketResponse(SocketAddress socketAddress, OriginReadData originReadData) { if (callbacks.size() == 0) return; if (socketOptions.getCallbackIDFactory() == null) return; // 获取回调ID String callbackID = socketOptions.getCallbackIDFactory().getCallbackID(originReadData); if (callbackID != null) { // 获取callbackID对应的callback SuperCallBack callBack = callbacks.get(callbackID); if (callBack != null) { // 回调 callBack.onSuccess(originReadData); callbacks.remove(callbackID); // 移除完成任务的callback LogUtil.d("移除的callbackId-->" + callbackID); } } } }; /** * 每发一条回调消息都要在这里添加监听对象 * * @param superCallBack */ public void addSocketCallback(SuperCallBack superCallBack) { callbacks.put(superCallBack.getCallbackId(), superCallBack); // 放入延时队列 long delayTime = socketOptions == null ? EasySocketOptions.getDefaultOptions().getRequestTimeout() : socketOptions.getRequestTimeout(); timeoutQueue.add(new timeoutItem(superCallBack.getCallbackId(), delayTime, TimeUnit.MILLISECONDS)); } /** * 延时队列的item */ class timeoutItem implements Delayed { String callbackId; // 当前callback的callbackId long executeTime; // 触发时间 public timeoutItem(String callbackId, long delayTime, TimeUnit timeUnit) { this.callbackId = callbackId; this.executeTime = System.currentTimeMillis() + (delayTime > 0 ? timeUnit.toMillis(delayTime) : 0); } @Override public long getDelay(TimeUnit unit) { return executeTime - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } } /** * 同一个消息发送多次,callbackId是不能一样的,所以这里要先check一下,否则服务端反馈的时候,客户端接收就会乱套 * * @param callbackSender * @return */ public void checkCallbackSender(SuperCallbackSender callbackSender) { Utils.checkNotNull(socketOptions.getCallbackIDFactory(), "要想实现EasySocket的回调功能,CallbackIdFactory不能为null," + "请实现一个CallbackIdFactory并在初始化的时候通过EasySocketOptions的setCallbackIdFactory进行配置"); String callbackId = callbackSender.getCallbackId(); // 同一个消息发送两次以上,callbackId是不能一样的,否则服务端反馈的时候,客户端接收就会乱套 if (callbacks.containsKey(callbackId)) { callbackSender.generateCallbackId(); } } }