package com.easysocket.connection.iowork; import com.easysocket.config.EasySocketOptions; import com.easysocket.interfaces.conn.IConnectionManager; import com.easysocket.interfaces.conn.ISocketActionDispatch; import com.easysocket.interfaces.io.IWriter; import com.easysocket.utils.HexUtil; import com.easysocket.utils.LogUtil; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Arrays; import java.util.concurrent.LinkedBlockingDeque; /** * Author:Alex * Date:2019/6/1 * Note: */ public class EasyWriter implements IWriter { /** * 输出流 */ private OutputStream outputStream; /** * 连接管理器 */ private IConnectionManager connectionManager; /** * socket参数 */ private EasySocketOptions socketOptions; /** * 行为分发 */ private ISocketActionDispatch actionDispatch; /** * 写数据线程 */ private Thread writerThread; /** * 是否停止写数据 */ private boolean isStop; /** * 待写入数据 */ private LinkedBlockingDeque packetsToSend = new LinkedBlockingDeque<>(); public EasyWriter(IConnectionManager connectionManager, ISocketActionDispatch actionDispatch) { this.connectionManager = connectionManager; socketOptions = connectionManager.getOptions(); this.actionDispatch = actionDispatch; } @Override public void openWriter() { outputStream = connectionManager.getOutStream(); if (writerThread == null) { isStop = false; writerThread = new Thread(writerTask, "writer thread"); writerThread.start(); } } @Override public void setOption(EasySocketOptions socketOptions) { this.socketOptions = socketOptions; } /** * 写任务 */ private Runnable writerTask = new Runnable() { @Override public void run() { // 循环写数据 while (!isStop) { try { byte[] sender = packetsToSend.take(); write(sender); } catch (InterruptedException | IOException e) { e.printStackTrace(); } } } }; @Override public void write(byte[] sendBytes) throws IOException { if (sendBytes != null) { LogUtil.d("EasyWriter--Socket发送数据String-->" + HexUtil.bytesToHex(sendBytes)); LogUtil.d("EasyWriter--Socket发送数据byte[]-->" + Arrays.toString(sendBytes)); int packageSize = socketOptions.getMaxWriteBytes(); // 每次可以发送的最大数据 int remainingCount = sendBytes.length; ByteBuffer writeBuf = ByteBuffer.allocate(packageSize); writeBuf.order(socketOptions.getReadOrder()); int index = 0; // 如果发送的数据大于单次可发送的最大数据,则分多次发送 while (remainingCount > 0) { int realWriteLength = Math.min(packageSize, remainingCount); writeBuf.clear(); // 清空缓存 writeBuf.rewind(); // 将position位置移到0 writeBuf.put(sendBytes, index, realWriteLength); writeBuf.flip(); byte[] writeArr = new byte[realWriteLength]; writeBuf.get(writeArr); outputStream.write(writeArr); outputStream.flush(); // 强制写入缓存中残留数据 index += realWriteLength; remainingCount -= realWriteLength; } } } @Override public void offer(byte[] sender) { if (!isStop) packetsToSend.offer(sender); } @Override public void closeWriter() { try { if (outputStream != null) outputStream.close(); shutDownThread(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } finally { outputStream = null; } } private void shutDownThread() throws InterruptedException { if (writerThread != null && writerThread.isAlive() && !writerThread.isInterrupted()) { try { isStop = true; writerThread.interrupt(); writerThread.join(); writerThread = null; } catch (InterruptedException e) { e.printStackTrace(); } } } }