zuoxiao
2024-03-04 4d2d9239d8915a030bb84cb2147774470b04bf27
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package com.easysocket.connection.dispatcher;
 
import com.easysocket.callback.SuperCallBack;
import com.easysocket.config.EasySocketOptions;
import com.easysocket.entity.OriginReadData;
import com.easysocket.entity.SocketAddress;
import com.easysocket.entity.basemsg.SuperCallbackSender;
import com.easysocket.exception.RequestTimeOutException;
import com.easysocket.interfaces.conn.IConnectionManager;
import com.easysocket.interfaces.conn.SocketActionListener;
import com.easysocket.utils.LogUtil;
import com.easysocket.utils.Utils;
 
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
 
/**
 * Author:Alex
 * Date:2019/6/4
 * Note:回调消息分发器
 */
public class CallbackResponseDispatcher {
    /**
     * 保存发送的每个回调消息的监听实例,key为回调标识callbackId,这样回调消息有反馈的时候,就可以找到并调用
     * 对应的监听对象
     */
    private Map<String, SuperCallBack> callbacks = new HashMap<>();
    /**
     * 保存需要进行超时检测的请求,这是一个延时队列,元素超时的时候会被取出来
     */
    private DelayQueue<timeoutItem> timeoutQueue = new DelayQueue<>();
    /**
     * 超时检测的线程管理器
     */
    private ExecutorService timeoutExecutor;
 
    /**
     * 连接管理
     */
    IConnectionManager connectionManager;
 
    private EasySocketOptions socketOptions;
 
 
    public CallbackResponseDispatcher(IConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
        socketOptions = connectionManager.getOptions();
        // 注册监听
        connectionManager.subscribeSocketAction(socketActionListener);
        // 开始超时检测线程
        engineThread();
    }
 
    /**
     * 设置socketoptions
     *
     * @param socketOptions
     */
    public void setSocketOptions(EasySocketOptions socketOptions) {
        this.socketOptions = socketOptions;
    }
 
    /**
     * 超时检测线程
     */
    public void engineThread() {
        try {
            if (timeoutExecutor == null || timeoutExecutor.isShutdown()) {
                timeoutExecutor = Executors.newSingleThreadExecutor();
                timeoutExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 只有超时的元素才会被取出,没有的话会被等待
                            timeoutItem item = timeoutQueue.take();
                            if (item != null) {
                                SuperCallBack callBack = callbacks.remove(item.callbackId);
                                if (callBack != null)
                                    callBack.onError(new RequestTimeOutException("request timeout"));
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        // 继续循环
                        if (timeoutExecutor != null && !timeoutExecutor.isShutdown()) {
                            run();
                        }
                    }
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 关闭线程
     */
    public void shutdownThread() {
        if (timeoutExecutor != null && !timeoutExecutor.isShutdown()) {
            // shutdown和shutdownNow的主要区别是前者中断未执行的线程,后者中断所有线程
            timeoutExecutor.shutdownNow();
            timeoutExecutor = null;
        }
 
    }
 
    /**
     * socket行为监听,重写反馈消息的回调方法
     */
    private SocketActionListener socketActionListener = new SocketActionListener() {
        @Override
        public void onSocketResponse(SocketAddress socketAddress, OriginReadData originReadData) {
            if (callbacks.size() == 0) return;
            if (socketOptions.getCallbackIDFactory() == null) return;
            // 获取回调ID
            String callbackID = socketOptions.getCallbackIDFactory().getCallbackID(originReadData);
            if (callbackID != null) {
                // 获取callbackID对应的callback
                SuperCallBack callBack = callbacks.get(callbackID);
                if (callBack != null) {
                    // 回调
                    callBack.onSuccess(originReadData);
                    callbacks.remove(callbackID); // 移除完成任务的callback
                    LogUtil.d("移除的callbackId-->" + callbackID);
                }
            }
        }
    };
 
 
    /**
     * 每发一条回调消息都要在这里添加监听对象
     *
     * @param superCallBack
     */
    public void addSocketCallback(SuperCallBack superCallBack) {
        callbacks.put(superCallBack.getCallbackId(), superCallBack);
        // 放入延时队列
        long delayTime = socketOptions == null ?
                EasySocketOptions.getDefaultOptions().getRequestTimeout() : socketOptions.getRequestTimeout();
        timeoutQueue.add(new timeoutItem(superCallBack.getCallbackId(), delayTime, TimeUnit.MILLISECONDS));
    }
 
    /**
     * 延时队列的item
     */
    class timeoutItem implements Delayed {
 
        String callbackId; // 当前callback的callbackId
        long executeTime; // 触发时间
 
        public timeoutItem(String callbackId, long delayTime, TimeUnit timeUnit) {
            this.callbackId = callbackId;
            this.executeTime = System.currentTimeMillis() + (delayTime > 0 ? timeUnit.toMillis(delayTime) : 0);
        }
 
        @Override
        public long getDelay(TimeUnit unit) {
            return executeTime - System.currentTimeMillis();
        }
 
        @Override
        public int compareTo(Delayed o) {
            return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }
    }
 
    /**
     * 同一个消息发送多次,callbackId是不能一样的,所以这里要先check一下,否则服务端反馈的时候,客户端接收就会乱套
     *
     * @param callbackSender
     * @return
     */
    public void checkCallbackSender(SuperCallbackSender callbackSender) {
 
        Utils.checkNotNull(socketOptions.getCallbackIDFactory(), "要想实现EasySocket的回调功能,CallbackIdFactory不能为null," +
                "请实现一个CallbackIdFactory并在初始化的时候通过EasySocketOptions的setCallbackIdFactory进行配置");
        String callbackId = callbackSender.getCallbackId();
        // 同一个消息发送两次以上,callbackId是不能一样的,否则服务端反馈的时候,客户端接收就会乱套
        if (callbacks.containsKey(callbackId)) {
            callbackSender.generateCallbackId();
        }
    }
 
}