package com.easysocket.connection.iowork; 
 | 
  
 | 
import com.easysocket.config.EasySocketOptions; 
 | 
import com.easysocket.interfaces.conn.IConnectionManager; 
 | 
import com.easysocket.interfaces.conn.ISocketActionDispatch; 
 | 
import com.easysocket.interfaces.io.IWriter; 
 | 
import com.easysocket.utils.HexUtil; 
 | 
import com.easysocket.utils.LogUtil; 
 | 
  
 | 
import java.io.IOException; 
 | 
import java.io.OutputStream; 
 | 
import java.nio.ByteBuffer; 
 | 
import java.nio.charset.Charset; 
 | 
import java.util.Arrays; 
 | 
import java.util.concurrent.LinkedBlockingDeque; 
 | 
  
 | 
/** 
 | 
 * Author:Alex 
 | 
 * Date:2019/6/1 
 | 
 * Note: 
 | 
 */ 
 | 
public class EasyWriter implements IWriter<EasySocketOptions> { 
 | 
  
 | 
    /** 
 | 
     * 输出流 
 | 
     */ 
 | 
    private OutputStream outputStream; 
 | 
  
 | 
    /** 
 | 
     * 连接管理器 
 | 
     */ 
 | 
    private IConnectionManager connectionManager; 
 | 
    /** 
 | 
     * socket参数 
 | 
     */ 
 | 
    private EasySocketOptions socketOptions; 
 | 
    /** 
 | 
     * 行为分发 
 | 
     */ 
 | 
    private ISocketActionDispatch actionDispatch; 
 | 
    /** 
 | 
     * 写数据线程 
 | 
     */ 
 | 
    private Thread writerThread; 
 | 
    /** 
 | 
     * 是否停止写数据 
 | 
     */ 
 | 
    private boolean isStop; 
 | 
    /** 
 | 
     * 待写入数据 
 | 
     */ 
 | 
    private LinkedBlockingDeque<byte[]> packetsToSend = new LinkedBlockingDeque<>(); 
 | 
  
 | 
    public EasyWriter(IConnectionManager connectionManager, ISocketActionDispatch actionDispatch) { 
 | 
        this.connectionManager = connectionManager; 
 | 
        socketOptions = connectionManager.getOptions(); 
 | 
        this.actionDispatch = actionDispatch; 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void openWriter() { 
 | 
        outputStream = connectionManager.getOutStream(); 
 | 
        if (writerThread == null) { 
 | 
            isStop = false; 
 | 
            writerThread = new Thread(writerTask, "writer thread"); 
 | 
            writerThread.start(); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void setOption(EasySocketOptions socketOptions) { 
 | 
        this.socketOptions = socketOptions; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 写任务 
 | 
     */ 
 | 
    private Runnable writerTask = new Runnable() { 
 | 
        @Override 
 | 
        public void run() { 
 | 
            // 循环写数据 
 | 
            while (!isStop) { 
 | 
                try { 
 | 
                    byte[] sender = packetsToSend.take(); 
 | 
                    write(sender); 
 | 
                } catch (InterruptedException | IOException e) { 
 | 
                    e.printStackTrace(); 
 | 
                } 
 | 
            } 
 | 
        } 
 | 
    }; 
 | 
  
 | 
    @Override 
 | 
    public void write(byte[] sendBytes) throws IOException { 
 | 
        if (sendBytes != null) { 
 | 
            LogUtil.d("EasyWriter--Socket发送数据String-->" + HexUtil.bytesToHex(sendBytes)); 
 | 
            LogUtil.d("EasyWriter--Socket发送数据byte[]-->" + Arrays.toString(sendBytes)); 
 | 
            int packageSize = socketOptions.getMaxWriteBytes(); // 每次可以发送的最大数据 
 | 
            int remainingCount = sendBytes.length; 
 | 
            ByteBuffer writeBuf = ByteBuffer.allocate(packageSize); 
 | 
            writeBuf.order(socketOptions.getReadOrder()); 
 | 
            int index = 0; 
 | 
            // 如果发送的数据大于单次可发送的最大数据,则分多次发送 
 | 
            while (remainingCount > 0) { 
 | 
                int realWriteLength = Math.min(packageSize, remainingCount); 
 | 
                writeBuf.clear(); // 清空缓存 
 | 
                writeBuf.rewind(); // 将position位置移到0 
 | 
                writeBuf.put(sendBytes, index, realWriteLength); 
 | 
                writeBuf.flip(); 
 | 
                byte[] writeArr = new byte[realWriteLength]; 
 | 
                writeBuf.get(writeArr); 
 | 
                outputStream.write(writeArr); 
 | 
                outputStream.flush(); // 强制写入缓存中残留数据 
 | 
                index += realWriteLength; 
 | 
                remainingCount -= realWriteLength; 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void offer(byte[] sender) { 
 | 
        if (!isStop) 
 | 
            packetsToSend.offer(sender); 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void closeWriter() { 
 | 
        try { 
 | 
            if (outputStream != null) 
 | 
                outputStream.close(); 
 | 
            shutDownThread(); 
 | 
        } catch (IOException | InterruptedException e) { 
 | 
            e.printStackTrace(); 
 | 
        } finally { 
 | 
            outputStream = null; 
 | 
        } 
 | 
    } 
 | 
  
 | 
    private void shutDownThread() throws InterruptedException { 
 | 
        if (writerThread != null && writerThread.isAlive() && !writerThread.isInterrupted()) { 
 | 
            try { 
 | 
                isStop = true; 
 | 
                writerThread.interrupt(); 
 | 
                writerThread.join(); 
 | 
                writerThread = null; 
 | 
            } catch (InterruptedException e) { 
 | 
                e.printStackTrace(); 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
} 
 |