zuoxiao
2024-03-04 4d2d9239d8915a030bb84cb2147774470b04bf27
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
package com.easysocket.connection.connect;
 
import android.os.Handler;
 
import com.easysocket.EasySocket;
import com.easysocket.callback.SuperCallBack;
import com.easysocket.config.EasySocketOptions;
import com.easysocket.connection.action.SocketAction;
import com.easysocket.connection.action.SocketStatus;
import com.easysocket.connection.dispatcher.CallbackResponseDispatcher;
import com.easysocket.connection.dispatcher.SocketActionDispatcher;
import com.easysocket.connection.heartbeat.HeartManager;
import com.easysocket.connection.iowork.IOManager;
import com.easysocket.connection.reconnect.AbsReconnection;
import com.easysocket.entity.SocketAddress;
import com.easysocket.entity.basemsg.SuperCallbackSender;
import com.easysocket.exception.NotNullException;
import com.easysocket.interfaces.config.IConnectionSwitchListener;
import com.easysocket.interfaces.conn.IConnectionManager;
import com.easysocket.interfaces.conn.ISocketActionListener;
import com.easysocket.utils.LogUtil;
import com.easysocket.utils.Utils;
 
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * Author:Alex
 * Date:2019/5/29
 * Note:socket连接的超类
 */
public abstract class SuperConnection implements IConnectionManager {
 
    /**
     * 连接状态,初始值为断开连接
     */
    protected final AtomicInteger connectionStatus = new AtomicInteger(SocketStatus.SOCKET_DISCONNECTED);
    /**
     * 连接线程
     */
    private ExecutorService connExecutor;
    /**
     * socket地址信息
     */
    protected SocketAddress socketAddress;
    /**
     * socket行为分发器
     */
    private SocketActionDispatcher actionDispatcher;
    /**
     * 重连管理器
     */
    private AbsReconnection reconnection;
    /**
     * io管理器
     */
    private IOManager ioManager;
    /**
     * 心跳管理器
     */
    private HeartManager heartManager;
    /**
     * 配置信息
     */
    protected EasySocketOptions socketOptions;
    /**
     * socket回调消息的分发器
     */
    private CallbackResponseDispatcher callbackResponseDispatcher;
    /**
     * 连接切换的监听
     */
    private IConnectionSwitchListener connectionSwitchListener;
 
    public SuperConnection(SocketAddress socketAddress) {
        this.socketAddress = socketAddress;
        actionDispatcher = new SocketActionDispatcher(this, socketAddress);
    }
 
    @Override
    public void subscribeSocketAction(ISocketActionListener iSocketActionListener) {
        actionDispatcher.subscribe(iSocketActionListener);
    }
 
    @Override
    public void unSubscribeSocketAction(ISocketActionListener iSocketActionListener) {
        actionDispatcher.unsubscribe(iSocketActionListener);
    }
 
    @Override
    public synchronized IConnectionManager setOptions(EasySocketOptions socketOptions) {
        if (socketOptions == null) return this;
 
        this.socketOptions = socketOptions;
 
        if (ioManager != null)
            ioManager.setOptions(socketOptions);
 
        if (heartManager != null)
            heartManager.setOptions(socketOptions);
 
        if (callbackResponseDispatcher != null)
            callbackResponseDispatcher.setSocketOptions(socketOptions);
 
        // 更改了重连器
        if (reconnection != null && !reconnection.equals(socketOptions.getReconnectionManager())) {
            reconnection.detach();
            reconnection = socketOptions.getReconnectionManager();
            reconnection.attach(this);
        }
        return this;
    }
 
    @Override
    public EasySocketOptions getOptions() {
        return socketOptions;
    }
 
    @Override
    public synchronized void connect() {
 
        if (connectionStatus.get() == SocketStatus.SOCKET_DISCONNECTING) {
            new Handler().postDelayed(new Runnable() {
                @Override
                public void run() {
                    LogUtil.d("---> socket>>connect>>正在断开连接,延时一秒執行重连");
                    connect();
                }
            }, 1000); // 延时1秒
            return;
        }
        LogUtil.d("---> socket开始连接");
        if (socketAddress.getIp() == null) {
            throw new NotNullException("请检查是否设置了IP地址");
        }
        // 正在连接
        connectionStatus.set(SocketStatus.SOCKET_CONNECTING);
 
        // 心跳管理器
        if (heartManager == null) {
            heartManager = new HeartManager(this, actionDispatcher);
        }
 
        // 重连管理器
        if (reconnection != null) {
            reconnection.detach();
        }
        reconnection = socketOptions.getReconnectionManager();
        if (reconnection != null) {
            reconnection.attach(this);
        }
 
        // 开启分发消息线程
        if (actionDispatcher != null) {
            actionDispatcher.startDispatchThread();
        }
        // 开启连接线程
        if (connExecutor == null || connExecutor.isShutdown()) {
            // 核心线程数为0,非核心线程数可以有Integer.MAX_VALUE个,存活时间为60秒,适合于在不断进行连接的情况下,避免重复创建和销毁线程
            connExecutor = Executors.newCachedThreadPool();
            // 执行连接任务
        }
        connExecutor.execute(connTask);
    }
 
    @Override
    public synchronized void disconnect(boolean isNeedReconnect) {
        // 判断当前socket的连接状态
        if (connectionStatus.get() == SocketStatus.SOCKET_DISCONNECTED) {
            return;
        }
        // 正在重连中
        if (isNeedReconnect && reconnection.isReconning()) {
            return;
        }
        // 正在断开连接
        connectionStatus.set(SocketStatus.SOCKET_DISCONNECTING);
 
        // 开启断开连接线程
        String info = socketAddress.getIp() + " : " + socketAddress.getPort();
        Thread disconnThread = new DisconnectThread(isNeedReconnect, "disconn thread:" + info);
        disconnThread.setDaemon(true);
        disconnThread.start();
    }
 
    /**
     * 断开连接线程
     */
    private class DisconnectThread extends Thread {
        boolean isNeedReconnect; // 当前连接的断开是否需要自动重连
 
        public DisconnectThread(boolean isNeedReconnect, String name) {
            super(name);
            this.isNeedReconnect = isNeedReconnect;
        }
 
        @Override
        public void run() {
            try {
                // 关闭io线程
                if (ioManager != null)
                    ioManager.closeIO();
                // 关闭回调分发器线程
                if (callbackResponseDispatcher != null)
                    callbackResponseDispatcher.shutdownThread();
                // 关闭连接线程
                if (connExecutor != null && !connExecutor.isShutdown()) {
                    connExecutor.shutdown();
                    connExecutor = null;
                }
                // 关闭连接
                closeConnection();
                actionDispatcher.dispatchAction(SocketAction.ACTION_DISCONNECTION, new Boolean(isNeedReconnect));
                if (!isNeedReconnect){
                    heartManager.stopHeartbeat();
                }
                LogUtil.d("---> 关闭socket连接");
                connectionStatus.set(SocketStatus.SOCKET_DISCONNECTED);
            } catch (IOException e) {
                // 断开连接发生异常
                e.printStackTrace();
            }
        }
    }
 
    // 连接任务
    private Runnable connTask = new Runnable() {
        @Override
        public void run() {
            try {
                openConnection();
            } catch (Exception e) {
                // 连接异常
                e.printStackTrace();
                LogUtil.d("---> socket连接失败");
                connectionStatus.set(SocketStatus.SOCKET_DISCONNECTED);
                // 第二个参数指需要重连
                actionDispatcher.dispatchAction(SocketAction.ACTION_CONN_FAIL, new Boolean(true));
 
            }
        }
    };
 
    /**
     * 连接成功
     */
    protected void onConnectionOpened() {
        LogUtil.d("---> socket连接成功");
        openSocketManager();
        connectionStatus.set(SocketStatus.SOCKET_CONNECTED);
        actionDispatcher.dispatchAction(SocketAction.ACTION_CONN_SUCCESS);
    }
 
    // 开启socket相关管理器
    private void openSocketManager() {
        if (callbackResponseDispatcher == null)
            callbackResponseDispatcher = new CallbackResponseDispatcher(this);
        if (ioManager == null) {
            ioManager = new IOManager(this, actionDispatcher);
        }
        ioManager.startIO();
 
        // 启动相关线程
        callbackResponseDispatcher.engineThread();
        ioManager.startIO();
    }
 
    // 切换了主机IP和端口
    @Override
    public synchronized void switchHost(SocketAddress socketAddress) {
        if (socketAddress != null) {
            SocketAddress oldAddress = this.socketAddress;
            this.socketAddress = socketAddress;
 
            if (actionDispatcher != null)
                actionDispatcher.setSocketAddress(socketAddress);
            // 切换主机
            if (connectionSwitchListener != null) {
                connectionSwitchListener.onSwitchConnectionInfo(this, oldAddress, socketAddress);
            }
        }
 
    }
 
    public void setOnConnectionSwitchListener(IConnectionSwitchListener listener) {
        connectionSwitchListener = listener;
    }
 
    @Override
    public boolean isConnectViable() {
        // 当前socket是否处于可连接状态
        return Utils.isNetConnected(EasySocket.getInstance().getContext()) && connectionStatus.get() == SocketStatus.SOCKET_DISCONNECTED;
    }
 
    @Override
    public int getConnectionStatus() {
        return connectionStatus.get();
    }
 
    /**
     * 打开连接
     *
     * @throws IOException
     */
    protected abstract void openConnection() throws Exception;
 
    /**
     * 关闭连接
     *
     * @throws IOException
     */
    protected abstract void closeConnection() throws IOException;
 
    /**
     * 发送bytes数据
     *
     * @param bytes
     * @return
     */
    private IConnectionManager sendBytes(byte[] bytes) {
        if (ioManager == null || connectionStatus.get() != SocketStatus.SOCKET_CONNECTED) {
            LogUtil.w("sendBytes错误-----ioManager为null或者connectionStatus状态不为已连接");
            return this;
        }
        ioManager.sendBytes(bytes);
        return this;
    }
 
    @Override
    public void onCallBack(SuperCallBack callBack) {
        callbackResponseDispatcher.addSocketCallback(callBack);
    }
 
 
    @Override
    public synchronized IConnectionManager upBytes(byte[] bytes) {
        sendBytes(bytes);
        return this;
    }
 
    @Override
    public synchronized IConnectionManager upCallbackMessage(SuperCallbackSender sender) {
        callbackResponseDispatcher.checkCallbackSender(sender);
        // 发送
        sendBytes(sender.pack());
        return this;
    }
 
 
    @Override
    public HeartManager getHeartManager() {
        return heartManager;
    }
}