左晓为主开发手持机充值管理机
zuoxiao
2024-03-22 e8232424de65da0254ce9637e19af9a4a6527964
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
package com.easysocket.connection.iowork;
 
import com.easysocket.config.EasySocketOptions;
import com.easysocket.connection.action.IOAction;
import com.easysocket.connection.action.SocketStatus;
import com.easysocket.entity.OriginReadData;
import com.easysocket.exception.ReadRecoverableExeption;
import com.easysocket.exception.ReadUnrecoverableException;
import com.easysocket.interfaces.config.IMessageProtocol;
import com.easysocket.interfaces.conn.IConnectionManager;
import com.easysocket.interfaces.conn.ISocketActionDispatch;
import com.easysocket.interfaces.io.IReader;
import com.easysocket.utils.HexUtil;
import com.easysocket.utils.LogUtil;
import com.tencent.bugly.crashreport.CrashReport;
 
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
 
/**
 * Author:Alex
 * Date:2019/6/1
 * Note:
 */
public class EasyReader implements IReader<EasySocketOptions> {
    /**
     * 输入流
     */
    private InputStream inputStream;
    /**
     * 读取原始数据的缓存空间
     */
    private ByteBuffer originBuf;
    /**
     * socket行为分发器
     */
    private ISocketActionDispatch actionDispatch;
    /**
     * 连接器
     */
    private IConnectionManager connectionManager;
    /**
     * 连接参数
     */
    private EasySocketOptions socketOptions;
 
    /**
     * 读数据时,余留数据的缓存
     */
    private ByteBuffer remainingBuf;
    /**
     * 读数据线程
     */
    private Thread readerThread;
    /**
     * 是否停止线程
     */
    private boolean stopThread;
 
    public EasyReader(IConnectionManager connectionManager, ISocketActionDispatch actionDispatch) {
        this.actionDispatch = actionDispatch;
        this.connectionManager = connectionManager;
        socketOptions = connectionManager.getOptions();
    }
 
    @Override
    public void read() throws IOException, ReadRecoverableExeption, ReadUnrecoverableException {
        OriginReadData originalData = new OriginReadData();
        IMessageProtocol messageProtocol = socketOptions.getMessageProtocol();
        // 消息协议为null,则直接读原始消息,不建议这样使用,因为会发生黏包、分包的问题
        if (messageProtocol == null) {
            readOriginDataFromSteam(originalData);
            return;
        }
 
        // 定义了消息协议
        int headerLength = messageProtocol.getHeaderLength(); // 包头长度
        ByteBuffer headBuf = ByteBuffer.allocate(headerLength); // 包头数据的buffer
        headBuf.order(socketOptions.getReadOrder());
 
        /*1、读 header=====>>>*/
        if (remainingBuf != null) { // 有余留
            // flip方法:一般从Buffer读数据前调用,将limit设置为当前position,将position设置为0,在读数据时,limit代表可读数据的有效长度
            remainingBuf.flip();
            // 读余留数据的长度
            int length = Math.min(remainingBuf.remaining(), headerLength);
            // 读入余留数据
            headBuf.put(remainingBuf.array(), 0, length);
 
            if (length < headerLength) { // 余留数据小于一个header
                // there are no data left
                remainingBuf = null;
                // 从stream中读剩下的header数据
                readHeaderFromSteam(headBuf, headerLength - length);
            } else {
                // 移动开始读数据的指针
                remainingBuf.position(headerLength);
            }
        } else { // 无余留
            // 从stream读取一个完整的 header
            readHeaderFromSteam(headBuf, headBuf.capacity());
        }
 
        // 保存header
        originalData.setHeaderData(headBuf.array());
 
        /*2、读 body=====>>>*/
        int bodyLength = messageProtocol.getBodyLength(originalData.getHeaderData(), socketOptions.getReadOrder());
        if (bodyLength > 0) {
            if (bodyLength > socketOptions.getMaxResponseDataMb() * 1024 * 1024) {
                throw new ReadUnrecoverableException("服务器返回的单次数据超过了规定的最大值,可能你的Socket消息协议不对,一般消息格式" +
                        "为:Header+Body,其中Header保存消息长度和类型等,Body保存消息内容,请规范好你的协议");
            }
            // 分配空间
            ByteBuffer bodyBuf = ByteBuffer.allocate(bodyLength);
            bodyBuf.order(socketOptions.getReadOrder());
 
            // 有余留
            if (remainingBuf != null) {
                int bodyStartPosition = remainingBuf.position();
 
                int length = Math.min(remainingBuf.remaining(), bodyLength);
                // 读length大小的余留数据
                bodyBuf.put(remainingBuf.array(), bodyStartPosition, length);
                // 移动position位置
                remainingBuf.position(bodyStartPosition + length);
 
                // 读的余留数据刚好等于一个body
                if (length == bodyLength) {
                    if (remainingBuf.remaining() > 0) { // 余留数据未读完
                        ByteBuffer temp = ByteBuffer.allocate(remainingBuf.remaining());
                        temp.order(socketOptions.getReadOrder());
                        temp.put(remainingBuf.array(), remainingBuf.position(), remainingBuf.remaining());
                        remainingBuf = temp;
                    } else { // there are no data left
                        remainingBuf = null;
                    }
 
                    // 保存body
                    originalData.setBodyData(bodyBuf.array());
 
                    LogUtil.d("Socket收到数据-->" +HexUtil.bytesToHex(originalData.getBodyBytes()) );
                    // 分发数据
                    actionDispatch.dispatchAction(IOAction.ACTION_READ_COMPLETE, originalData);
 
                    /*return读取结束*/
                    return;
 
                } else { // there are no data left in buffer and some data pieces in channel
                    remainingBuf = null;
                }
            }
            // 无余留,则从stream中读
            readBodyFromStream(bodyBuf);
            // 保存body到originalData
            originalData.setBodyData(bodyBuf.array());
 
        } else if (bodyLength == 0) { // 没有body数据
            originalData.setBodyData(new byte[0]);
            if (remainingBuf != null) {
                // the body is empty so header remaining buf need set null
                if (remainingBuf.hasRemaining()) {
                    ByteBuffer temp = ByteBuffer.allocate(remainingBuf.remaining());
                    temp.order(socketOptions.getReadOrder());
                    temp.put(remainingBuf.array(), remainingBuf.position(), remainingBuf.remaining());
                    remainingBuf = temp;
                } else {
                    remainingBuf = null;
                }
            }
        } else if (bodyLength < 0) {
            throw new ReadUnrecoverableException("数据body的长度不能小于0");
        }
 
        LogUtil.d("Socket收到数据-->" + HexUtil.bytesToHex(originalData.getBodyBytes()));
        // 分发
        actionDispatch.dispatchAction(IOAction.ACTION_READ_COMPLETE, originalData);
 
    }
 
 
    /**
     * 读数据任务
     */
    private Runnable readerTask = new Runnable() {
        @Override
        public void run() {
            try {
                while (!stopThread) {
                    read();
                }
            } catch (ReadUnrecoverableException unrecoverableException) {
                // 读异常
                unrecoverableException.printStackTrace();
                // 停止线程
                stopThread = true;
                release();
                LogUtil.i("reader停止线程");
                CrashReport.postCatchedException(unrecoverableException);
            } catch (ReadRecoverableExeption readRecoverableExeption) {
                readRecoverableExeption.printStackTrace();
                // 重连
                LogUtil.d("--->重连 ReadRecoverableExeption");
                connectionManager.disconnect(true);
            } catch (IOException e) {
                e.printStackTrace();
                // 重连
                LogUtil.d("--->重连 IOException");
                if (connectionManager.getConnectionStatus() != SocketStatus.SOCKET_DISCONNECTING) {
                    connectionManager.disconnect(true);
                }
            }
        }
    };
 
 
    private void readHeaderFromSteam(ByteBuffer headBuf, int readLength) throws ReadRecoverableExeption, IOException {
        for (int i = 0; i < readLength; i++) {
            byte[] bytes = new byte[1];
            // 从输入流中读数据,无数据时会阻塞
            int value = inputStream.read(bytes);
            // -1代表读到了文件的末尾,一般是因为服务器断开了连接
            if (value == -1) {
                throw new ReadRecoverableExeption("读数据失败,可能是因为socket跟服务器断开了连接");
            }
            headBuf.put(bytes);
        }
    }
 
    private void readOriginDataFromSteam(OriginReadData readData) throws ReadRecoverableExeption, IOException {
        // 用 全局originBuf避免重复创建字节数组
        int len = inputStream.read(originBuf.array());
        // no more data
        if (len == -1) {
            throw new ReadRecoverableExeption("读数据失败,可能因为socket跟服务器断开了连接");
        }
        // bytes复制
        byte[] data = new byte[len];
        originBuf.get(data, 0, len);
        readData.setBodyData(data);
        LogUtil.d("Socket收到数据-->" + HexUtil.bytesToHex(readData.getBodyBytes()));
        // 分发数据
        actionDispatch.dispatchAction(IOAction.ACTION_READ_COMPLETE, readData);
        // 相当于把指针重新指向positon=0
        originBuf.clear();
    }
 
    private void readBodyFromStream(ByteBuffer byteBuffer) throws ReadRecoverableExeption, IOException {
        // while循环直到byteBuffer装满数据
        while (byteBuffer.hasRemaining()) {
            byte[] bufArray = new byte[socketOptions.getMaxReadBytes()]; // 从服务器单次读取的最大值
            int len = inputStream.read(bufArray);
            if (len == -1) { // no more data
                throw new ReadRecoverableExeption("读数据失败,可能是因为socket跟服务器断开了连接");
            }
            int remaining = byteBuffer.remaining();
            if (len > remaining) { // 从stream读的数据超过byteBuffer的剩余空间
                byteBuffer.put(bufArray, 0, remaining);
                // 将多余的数据保存到remainingBuf中缓存,等下一次读取
                remainingBuf = ByteBuffer.allocate(len - remaining);
                remainingBuf.order(socketOptions.getReadOrder());
                remainingBuf.put(bufArray, remaining, len - remaining);
            } else { // 从stream读的数据小于或等于byteBuffer的剩余空间
                byteBuffer.put(bufArray, 0, len);
            }
        }
    }
 
    @Override
    public void openReader() {
        init();
        if (readerThread == null || !readerThread.isAlive()) {
            readerThread = new Thread(readerTask, "reader thread");
            stopThread = false;
            readerThread.start();
        }
    }
 
    @Override
    public void closeReader() {
        try {
            // 关闭线程释放资源
            shutDownThread();
            release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
    // 释放资源
    private void release() {
        LogUtil.i("reader线程释放资源");
        if (originBuf != null) {
            originBuf = null;
        }
        if (remainingBuf != null) {
            remainingBuf = null;
        }
        if (readerThread != null && !readerThread.isAlive()) {
            readerThread = null;
        }
 
        try {
            if (inputStream != null)
                inputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            inputStream = null;
        }
    }
 
    // 初始化
    private void init() {
        inputStream = connectionManager.getInputStream();
        // 没有定义消息协议
        if (socketOptions.getMessageProtocol() == null) {
            originBuf = ByteBuffer.allocate(1024 * 4);
        }
    }
 
    @Override
    public void setOption(EasySocketOptions socketOptions) {
        this.socketOptions = socketOptions;
    }
 
    // 关闭读数据线程
    private void shutDownThread() throws InterruptedException {
        if (readerThread != null && readerThread.isAlive() && !readerThread.isInterrupted()) {
            try {
                stopThread = true;
                readerThread.interrupt();
                readerThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}