package com.easysocket.connection.heartbeat;
|
|
import com.easysocket.config.EasySocketOptions;
|
import com.easysocket.entity.OriginReadData;
|
import com.easysocket.entity.SocketAddress;
|
import com.easysocket.interfaces.config.IOptions;
|
import com.easysocket.interfaces.conn.IConnectionManager;
|
import com.easysocket.interfaces.conn.IHeartManager;
|
import com.easysocket.interfaces.conn.ISocketActionDispatch;
|
import com.easysocket.interfaces.conn.SocketActionListener;
|
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
/**
|
* Author:Alex
|
* Date:2019/12/8
|
* Note:心跳包检测管理器
|
*/
|
public class HeartManager extends SocketActionListener implements IOptions, IHeartManager {
|
|
/**
|
* 连接器
|
*/
|
private IConnectionManager connectionManager;
|
/**
|
* 连接参数
|
*/
|
private EasySocketOptions socketOptions;
|
/**
|
* 客户端心跳包
|
*/
|
private byte[] clientHeart;
|
/**
|
* 心跳包发送线程
|
*/
|
private ScheduledExecutorService heartExecutor;
|
/**
|
* 记录心跳的失联次数
|
*/
|
private AtomicInteger loseTimes = new AtomicInteger(-1);
|
/**
|
* 心跳频率
|
*/
|
private long freq;
|
/**
|
* 是否激活了心跳
|
*/
|
private boolean isActivate;
|
|
|
/**
|
* 心跳包接收监听
|
*/
|
private HeartbeatListener heartbeatListener;
|
|
|
public HeartManager(IConnectionManager iConnectionManager, ISocketActionDispatch actionDispatch) {
|
this.connectionManager = iConnectionManager;
|
socketOptions = iConnectionManager.getOptions();
|
actionDispatch.subscribe(this); // 注册监听
|
}
|
|
/**
|
* 心跳发送任务
|
*/
|
private final Runnable beatTask = new Runnable() {
|
@Override
|
public void run() {
|
// 心跳丢失次数判断,心跳包丢失了一定的次数则会进行socket的断开重连
|
if (socketOptions.getMaxHeartbeatLoseTimes() != -1
|
&& loseTimes.incrementAndGet() >= socketOptions.getMaxHeartbeatLoseTimes()) {
|
// 断开重连
|
connectionManager.disconnect(true);
|
resetLoseTimes();
|
} else { // 发送心跳包
|
connectionManager.upBytes(clientHeart);
|
}
|
}
|
};
|
|
|
@Override
|
public void startHeartbeat(byte[] clientHeart, HeartbeatListener listener) {
|
this.clientHeart = clientHeart;
|
this.heartbeatListener = listener;
|
isActivate = true;
|
openThread();
|
}
|
|
|
// 启动心跳线程
|
private void openThread() {
|
freq = socketOptions.getHeartbeatFreq(); // 心跳频率
|
// 启动线程发送心跳
|
if (heartExecutor == null || heartExecutor.isShutdown()) {
|
heartExecutor = Executors.newSingleThreadScheduledExecutor();
|
heartExecutor.scheduleWithFixedDelay(beatTask, 0, freq, TimeUnit.MILLISECONDS);
|
}
|
}
|
|
/**
|
* 停止心跳发送
|
*/
|
@Override
|
public void stopHeartbeat() {
|
isActivate = false;
|
closeThread();
|
}
|
|
// 停止心跳线程
|
private void closeThread() {
|
if (heartExecutor != null && !heartExecutor.isShutdown()) {
|
heartExecutor.shutdownNow();
|
heartExecutor = null;
|
resetLoseTimes(); // 重置
|
}
|
}
|
|
@Override
|
public void onReceiveHeartBeat() {
|
resetLoseTimes();
|
}
|
|
|
private void resetLoseTimes() {
|
loseTimes.set(-1);
|
}
|
|
@Override
|
public void onSocketConnSuccess(SocketAddress socketAddress) {
|
if (isActivate) {
|
openThread();
|
}
|
}
|
|
@Override
|
public void onSocketConnFail(SocketAddress socketAddress, boolean isNeedReconnect) {
|
// 如果不需要重连,则停止心跳频率线程
|
if (!isNeedReconnect) {
|
closeThread();
|
}
|
}
|
|
@Override
|
public void onSocketDisconnect(SocketAddress socketAddress, boolean isNeedReconnect) {
|
// 如果不需要重连,则停止心跳检测
|
if (!isNeedReconnect) {
|
closeThread();
|
}
|
}
|
|
@Override
|
public void onSocketResponse(SocketAddress socketAddress, OriginReadData originReadData) {
|
if (heartbeatListener != null && heartbeatListener.isServerHeartbeat(originReadData)) {
|
// 收到服务器心跳
|
onReceiveHeartBeat();
|
}
|
}
|
|
@Override
|
public Object setOptions(EasySocketOptions socketOptions) {
|
this.socketOptions = socketOptions;
|
freq = socketOptions.getHeartbeatFreq();
|
freq = freq < 1000 ? 1000 : freq; // 不能小于一秒
|
return this;
|
}
|
|
@Override
|
public EasySocketOptions getOptions() {
|
return socketOptions;
|
}
|
|
public interface HeartbeatListener {
|
// 是否为服务器心跳
|
boolean isServerHeartbeat(OriginReadData orginReadData);
|
}
|
|
}
|