package com.easysocket.connection.heartbeat; import com.easysocket.config.EasySocketOptions; import com.easysocket.entity.OriginReadData; import com.easysocket.entity.SocketAddress; import com.easysocket.interfaces.config.IOptions; import com.easysocket.interfaces.conn.IConnectionManager; import com.easysocket.interfaces.conn.IHeartManager; import com.easysocket.interfaces.conn.ISocketActionDispatch; import com.easysocket.interfaces.conn.SocketActionListener; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * Author:Alex * Date:2019/12/8 * Note:心跳包检测管理器 */ public class HeartManager extends SocketActionListener implements IOptions, IHeartManager { /** * 连接器 */ private IConnectionManager connectionManager; /** * 连接参数 */ private EasySocketOptions socketOptions; /** * 客户端心跳包 */ private byte[] clientHeart; /** * 心跳包发送线程 */ private ScheduledExecutorService heartExecutor; /** * 记录心跳的失联次数 */ private AtomicInteger loseTimes = new AtomicInteger(-1); /** * 心跳频率 */ private long freq; /** * 是否激活了心跳 */ private boolean isActivate; /** * 心跳包接收监听 */ private HeartbeatListener heartbeatListener; public HeartManager(IConnectionManager iConnectionManager, ISocketActionDispatch actionDispatch) { this.connectionManager = iConnectionManager; socketOptions = iConnectionManager.getOptions(); actionDispatch.subscribe(this); // 注册监听 } /** * 心跳发送任务 */ private final Runnable beatTask = new Runnable() { @Override public void run() { // 心跳丢失次数判断,心跳包丢失了一定的次数则会进行socket的断开重连 if (socketOptions.getMaxHeartbeatLoseTimes() != -1 && loseTimes.incrementAndGet() >= socketOptions.getMaxHeartbeatLoseTimes()) { // 断开重连 connectionManager.disconnect(true); resetLoseTimes(); } else { // 发送心跳包 connectionManager.upBytes(clientHeart); } } }; @Override public void startHeartbeat(byte[] clientHeart, HeartbeatListener listener) { this.clientHeart = clientHeart; this.heartbeatListener = listener; isActivate = true; openThread(); } // 启动心跳线程 private void openThread() { freq = socketOptions.getHeartbeatFreq(); // 心跳频率 // 启动线程发送心跳 if (heartExecutor == null || heartExecutor.isShutdown()) { heartExecutor = Executors.newSingleThreadScheduledExecutor(); heartExecutor.scheduleWithFixedDelay(beatTask, 0, freq, TimeUnit.MILLISECONDS); } } /** * 停止心跳发送 */ @Override public void stopHeartbeat() { isActivate = false; closeThread(); } // 停止心跳线程 private void closeThread() { if (heartExecutor != null && !heartExecutor.isShutdown()) { heartExecutor.shutdownNow(); heartExecutor = null; resetLoseTimes(); // 重置 } } @Override public void onReceiveHeartBeat() { resetLoseTimes(); } private void resetLoseTimes() { loseTimes.set(-1); } @Override public void onSocketConnSuccess(SocketAddress socketAddress) { if (isActivate) { openThread(); } } @Override public void onSocketConnFail(SocketAddress socketAddress, boolean isNeedReconnect) { // 如果不需要重连,则停止心跳频率线程 if (!isNeedReconnect) { closeThread(); } } @Override public void onSocketDisconnect(SocketAddress socketAddress, boolean isNeedReconnect) { // 如果不需要重连,则停止心跳检测 if (!isNeedReconnect) { closeThread(); } } @Override public void onSocketResponse(SocketAddress socketAddress, OriginReadData originReadData) { if (heartbeatListener != null && heartbeatListener.isServerHeartbeat(originReadData)) { // 收到服务器心跳 onReceiveHeartBeat(); } } @Override public Object setOptions(EasySocketOptions socketOptions) { this.socketOptions = socketOptions; freq = socketOptions.getHeartbeatFreq(); freq = freq < 1000 ? 1000 : freq; // 不能小于一秒 return this; } @Override public EasySocketOptions getOptions() { return socketOptions; } public interface HeartbeatListener { // 是否为服务器心跳 boolean isServerHeartbeat(OriginReadData orginReadData); } }