package com.easysocket.connection.iowork; import com.easysocket.config.EasySocketOptions; import com.easysocket.connection.action.IOAction; import com.easysocket.connection.action.SocketStatus; import com.easysocket.entity.OriginReadData; import com.easysocket.exception.ReadRecoverableExeption; import com.easysocket.exception.ReadUnrecoverableException; import com.easysocket.interfaces.config.IMessageProtocol; import com.easysocket.interfaces.conn.IConnectionManager; import com.easysocket.interfaces.conn.ISocketActionDispatch; import com.easysocket.interfaces.io.IReader; import com.easysocket.utils.HexUtil; import com.easysocket.utils.LogUtil; import com.tencent.bugly.crashreport.CrashReport; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; /** * Author:Alex * Date:2019/6/1 * Note: */ public class EasyReader implements IReader { /** * 输入流 */ private InputStream inputStream; /** * 读取原始数据的缓存空间 */ private ByteBuffer originBuf; /** * socket行为分发器 */ private ISocketActionDispatch actionDispatch; /** * 连接器 */ private IConnectionManager connectionManager; /** * 连接参数 */ private EasySocketOptions socketOptions; /** * 读数据时,余留数据的缓存 */ private ByteBuffer remainingBuf; /** * 读数据线程 */ private Thread readerThread; /** * 是否停止线程 */ private boolean stopThread; public EasyReader(IConnectionManager connectionManager, ISocketActionDispatch actionDispatch) { this.actionDispatch = actionDispatch; this.connectionManager = connectionManager; socketOptions = connectionManager.getOptions(); } @Override public void read() throws IOException, ReadRecoverableExeption, ReadUnrecoverableException { OriginReadData originalData = new OriginReadData(); IMessageProtocol messageProtocol = socketOptions.getMessageProtocol(); // 消息协议为null,则直接读原始消息,不建议这样使用,因为会发生黏包、分包的问题 if (messageProtocol == null) { readOriginDataFromSteam(originalData); return; } // 定义了消息协议 int headerLength = messageProtocol.getHeaderLength(); // 包头长度 ByteBuffer headBuf = ByteBuffer.allocate(headerLength); // 包头数据的buffer headBuf.order(socketOptions.getReadOrder()); /*1、读 header=====>>>*/ if (remainingBuf != null) { // 有余留 // flip方法:一般从Buffer读数据前调用,将limit设置为当前position,将position设置为0,在读数据时,limit代表可读数据的有效长度 remainingBuf.flip(); // 读余留数据的长度 int length = Math.min(remainingBuf.remaining(), headerLength); // 读入余留数据 headBuf.put(remainingBuf.array(), 0, length); if (length < headerLength) { // 余留数据小于一个header // there are no data left remainingBuf = null; // 从stream中读剩下的header数据 readHeaderFromSteam(headBuf, headerLength - length); } else { // 移动开始读数据的指针 remainingBuf.position(headerLength); } } else { // 无余留 // 从stream读取一个完整的 header readHeaderFromSteam(headBuf, headBuf.capacity()); } // 保存header originalData.setHeaderData(headBuf.array()); /*2、读 body=====>>>*/ int bodyLength = messageProtocol.getBodyLength(originalData.getHeaderData(), socketOptions.getReadOrder()); if (bodyLength > 0) { if (bodyLength > socketOptions.getMaxResponseDataMb() * 1024 * 1024) { throw new ReadUnrecoverableException("服务器返回的单次数据超过了规定的最大值,可能你的Socket消息协议不对,一般消息格式" + "为:Header+Body,其中Header保存消息长度和类型等,Body保存消息内容,请规范好你的协议"); } // 分配空间 ByteBuffer bodyBuf = ByteBuffer.allocate(bodyLength); bodyBuf.order(socketOptions.getReadOrder()); // 有余留 if (remainingBuf != null) { int bodyStartPosition = remainingBuf.position(); int length = Math.min(remainingBuf.remaining(), bodyLength); // 读length大小的余留数据 bodyBuf.put(remainingBuf.array(), bodyStartPosition, length); // 移动position位置 remainingBuf.position(bodyStartPosition + length); // 读的余留数据刚好等于一个body if (length == bodyLength) { if (remainingBuf.remaining() > 0) { // 余留数据未读完 ByteBuffer temp = ByteBuffer.allocate(remainingBuf.remaining()); temp.order(socketOptions.getReadOrder()); temp.put(remainingBuf.array(), remainingBuf.position(), remainingBuf.remaining()); remainingBuf = temp; } else { // there are no data left remainingBuf = null; } // 保存body originalData.setBodyData(bodyBuf.array()); LogUtil.d("Socket收到数据-->" +HexUtil.bytesToHex(originalData.getBodyBytes()) ); // 分发数据 actionDispatch.dispatchAction(IOAction.ACTION_READ_COMPLETE, originalData); /*return读取结束*/ return; } else { // there are no data left in buffer and some data pieces in channel remainingBuf = null; } } // 无余留,则从stream中读 readBodyFromStream(bodyBuf); // 保存body到originalData originalData.setBodyData(bodyBuf.array()); } else if (bodyLength == 0) { // 没有body数据 originalData.setBodyData(new byte[0]); if (remainingBuf != null) { // the body is empty so header remaining buf need set null if (remainingBuf.hasRemaining()) { ByteBuffer temp = ByteBuffer.allocate(remainingBuf.remaining()); temp.order(socketOptions.getReadOrder()); temp.put(remainingBuf.array(), remainingBuf.position(), remainingBuf.remaining()); remainingBuf = temp; } else { remainingBuf = null; } } } else if (bodyLength < 0) { throw new ReadUnrecoverableException("数据body的长度不能小于0"); } LogUtil.d("Socket收到数据-->" + HexUtil.bytesToHex(originalData.getBodyBytes())); // 分发 actionDispatch.dispatchAction(IOAction.ACTION_READ_COMPLETE, originalData); } /** * 读数据任务 */ private Runnable readerTask = new Runnable() { @Override public void run() { try { while (!stopThread) { read(); } } catch (ReadUnrecoverableException unrecoverableException) { // 读异常 unrecoverableException.printStackTrace(); // 停止线程 stopThread = true; release(); LogUtil.i("reader停止线程"); CrashReport.postCatchedException(unrecoverableException); } catch (ReadRecoverableExeption readRecoverableExeption) { readRecoverableExeption.printStackTrace(); // 重连 LogUtil.d("--->重连 ReadRecoverableExeption"); connectionManager.disconnect(true); } catch (IOException e) { e.printStackTrace(); // 重连 LogUtil.d("--->重连 IOException"); if (connectionManager.getConnectionStatus() != SocketStatus.SOCKET_DISCONNECTING) { connectionManager.disconnect(true); } } } }; private void readHeaderFromSteam(ByteBuffer headBuf, int readLength) throws ReadRecoverableExeption, IOException { for (int i = 0; i < readLength; i++) { byte[] bytes = new byte[1]; // 从输入流中读数据,无数据时会阻塞 int value = inputStream.read(bytes); // -1代表读到了文件的末尾,一般是因为服务器断开了连接 if (value == -1) { throw new ReadRecoverableExeption("读数据失败,可能是因为socket跟服务器断开了连接"); } headBuf.put(bytes); } } private void readOriginDataFromSteam(OriginReadData readData) throws ReadRecoverableExeption, IOException { // 用 全局originBuf避免重复创建字节数组 int len = inputStream.read(originBuf.array()); // no more data if (len == -1) { throw new ReadRecoverableExeption("读数据失败,可能因为socket跟服务器断开了连接"); } // bytes复制 byte[] data = new byte[len]; originBuf.get(data, 0, len); readData.setBodyData(data); LogUtil.d("Socket收到数据-->" + HexUtil.bytesToHex(readData.getBodyBytes())); // 分发数据 actionDispatch.dispatchAction(IOAction.ACTION_READ_COMPLETE, readData); // 相当于把指针重新指向positon=0 originBuf.clear(); } private void readBodyFromStream(ByteBuffer byteBuffer) throws ReadRecoverableExeption, IOException { // while循环直到byteBuffer装满数据 while (byteBuffer.hasRemaining()) { byte[] bufArray = new byte[socketOptions.getMaxReadBytes()]; // 从服务器单次读取的最大值 int len = inputStream.read(bufArray); if (len == -1) { // no more data throw new ReadRecoverableExeption("读数据失败,可能是因为socket跟服务器断开了连接"); } int remaining = byteBuffer.remaining(); if (len > remaining) { // 从stream读的数据超过byteBuffer的剩余空间 byteBuffer.put(bufArray, 0, remaining); // 将多余的数据保存到remainingBuf中缓存,等下一次读取 remainingBuf = ByteBuffer.allocate(len - remaining); remainingBuf.order(socketOptions.getReadOrder()); remainingBuf.put(bufArray, remaining, len - remaining); } else { // 从stream读的数据小于或等于byteBuffer的剩余空间 byteBuffer.put(bufArray, 0, len); } } } @Override public void openReader() { init(); if (readerThread == null || !readerThread.isAlive()) { readerThread = new Thread(readerTask, "reader thread"); stopThread = false; readerThread.start(); } } @Override public void closeReader() { try { // 关闭线程释放资源 shutDownThread(); release(); } catch (InterruptedException e) { e.printStackTrace(); } } // 释放资源 private void release() { LogUtil.i("reader线程释放资源"); if (originBuf != null) { originBuf = null; } if (remainingBuf != null) { remainingBuf = null; } if (readerThread != null && !readerThread.isAlive()) { readerThread = null; } try { if (inputStream != null) inputStream.close(); } catch (IOException e) { e.printStackTrace(); } finally { inputStream = null; } } // 初始化 private void init() { inputStream = connectionManager.getInputStream(); // 没有定义消息协议 if (socketOptions.getMessageProtocol() == null) { originBuf = ByteBuffer.allocate(1024 * 4); } } @Override public void setOption(EasySocketOptions socketOptions) { this.socketOptions = socketOptions; } // 关闭读数据线程 private void shutDownThread() throws InterruptedException { if (readerThread != null && readerThread.isAlive() && !readerThread.isInterrupted()) { try { stopThread = true; readerThread.interrupt(); readerThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } } }