左晓为主开发手持机充值管理机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package com.easysocket.connection.iowork;
 
import com.easysocket.config.EasySocketOptions;
import com.easysocket.interfaces.conn.IConnectionManager;
import com.easysocket.interfaces.conn.ISocketActionDispatch;
import com.easysocket.interfaces.io.IWriter;
import com.easysocket.utils.HexUtil;
import com.easysocket.utils.LogUtil;
 
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingDeque;
 
/**
 * Author:Alex
 * Date:2019/6/1
 * Note:
 */
public class EasyWriter implements IWriter<EasySocketOptions> {
 
    /**
     * 输出流
     */
    private OutputStream outputStream;
 
    /**
     * 连接管理器
     */
    private IConnectionManager connectionManager;
    /**
     * socket参数
     */
    private EasySocketOptions socketOptions;
    /**
     * 行为分发
     */
    private ISocketActionDispatch actionDispatch;
    /**
     * 写数据线程
     */
    private Thread writerThread;
    /**
     * 是否停止写数据
     */
    private boolean isStop;
    /**
     * 待写入数据
     */
    private LinkedBlockingDeque<byte[]> packetsToSend = new LinkedBlockingDeque<>();
 
    public EasyWriter(IConnectionManager connectionManager, ISocketActionDispatch actionDispatch) {
        this.connectionManager = connectionManager;
        socketOptions = connectionManager.getOptions();
        this.actionDispatch = actionDispatch;
    }
 
    @Override
    public void openWriter() {
        outputStream = connectionManager.getOutStream();
        if (writerThread == null) {
            isStop = false;
            writerThread = new Thread(writerTask, "writer thread");
            writerThread.start();
        }
    }
 
    @Override
    public void setOption(EasySocketOptions socketOptions) {
        this.socketOptions = socketOptions;
    }
 
    /**
     * 写任务
     */
    private Runnable writerTask = new Runnable() {
        @Override
        public void run() {
            // 循环写数据
            while (!isStop) {
                try {
                    byte[] sender = packetsToSend.take();
                    write(sender);
                } catch (InterruptedException | IOException e) {
                    e.printStackTrace();
                }
            }
        }
    };
 
    @Override
    public void write(byte[] sendBytes) throws IOException {
        if (sendBytes != null) {
            LogUtil.d("EasyWriter--Socket发送数据String-->" + HexUtil.bytesToHex(sendBytes));
            LogUtil.d("EasyWriter--Socket发送数据byte[]-->" + Arrays.toString(sendBytes));
            int packageSize = socketOptions.getMaxWriteBytes(); // 每次可以发送的最大数据
            int remainingCount = sendBytes.length;
            ByteBuffer writeBuf = ByteBuffer.allocate(packageSize);
            writeBuf.order(socketOptions.getReadOrder());
            int index = 0;
            // 如果发送的数据大于单次可发送的最大数据,则分多次发送
            while (remainingCount > 0) {
                int realWriteLength = Math.min(packageSize, remainingCount);
                writeBuf.clear(); // 清空缓存
                writeBuf.rewind(); // 将position位置移到0
                writeBuf.put(sendBytes, index, realWriteLength);
                writeBuf.flip();
                byte[] writeArr = new byte[realWriteLength];
                writeBuf.get(writeArr);
                outputStream.write(writeArr);
                outputStream.flush(); // 强制写入缓存中残留数据
                index += realWriteLength;
                remainingCount -= realWriteLength;
            }
        }
    }
 
    @Override
    public void offer(byte[] sender) {
        if (!isStop)
            packetsToSend.offer(sender);
    }
 
    @Override
    public void closeWriter() {
        try {
            if (outputStream != null)
                outputStream.close();
            shutDownThread();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            outputStream = null;
        }
    }
 
    private void shutDownThread() throws InterruptedException {
        if (writerThread != null && writerThread.isAlive() && !writerThread.isInterrupted()) {
            try {
                isStop = true;
                writerThread.interrupt();
                writerThread.join();
                writerThread = null;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}