package com.dayu.pipirrapp.net;
|
|
import android.content.Context;
|
import android.util.Log;
|
|
import androidx.lifecycle.LifecycleOwner;
|
import androidx.lifecycle.Observer;
|
|
import com.dayu.pipirrapp.MyApplication;
|
import com.dayu.pipirrapp.utils.CommonKeyName;
|
import com.dayu.pipirrapp.utils.MyJsonParser;
|
import com.dayu.pipirrapp.utils.MyLog;
|
import com.dayu.pipirrapp.utils.NetUtils;
|
import com.jeremyliao.liveeventbus.LiveEventBus;
|
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
import java.util.HashMap;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
|
/**
|
* MQTT相关设置
|
*
|
* @author zuoxiao
|
* @version 1.0
|
* @since 2024-11-26
|
*/
|
public class MqttManager {
|
|
// private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址
|
private static final String MQTT_BROKER_URL = "tcp://192.168.10.52:1883";
|
private static final String CLIENT_ID = "mqttx_a7a9fe73";
|
private static final String TOPIC = "workOrder"; // 订阅的主题
|
|
private MqttClient mqttClient;
|
private MqttConnectOptions connectOptions;
|
boolean isHasNet = true;
|
//是否连接成功过一次,没有的话联网后重连
|
boolean isConnet = false;
|
private ExecutorService executorService = Executors.newSingleThreadExecutor();
|
|
public MqttManager(Context context, LifecycleOwner lifecycleOwner) {
|
try {
|
mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence());
|
connectOptions = new MqttConnectOptions();
|
connectOptions.setUserName("mqtt_yjy");
|
connectOptions.setPassword("yjy".toCharArray());
|
connectOptions.setCleanSession(false);
|
connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间
|
connectOptions.setAutomaticReconnect(true); // 启用自动重连
|
connectOptions.setConnectionTimeout(30);// 设置连接超时时间,单位为秒
|
mqttClient.setCallback(new MqttCallback() {
|
@Override
|
public void connectionLost(Throwable cause) {
|
Log.d("MqttManager", "连接丢失:" + cause.getMessage());
|
// 处理连接丢失,可以尝试重新连接
|
reconnect();
|
}
|
|
@Override
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
Log.d("MqttManager", "messageArrived收到的消:" + new String(message.getPayload()));
|
// 处理收到的消息
|
}
|
|
@Override
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
// 处理消息发送完成
|
try {
|
Log.d("MqttManager", "发送完成:" + new String(token.getMessage().toString()));
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
});
|
LiveEventBus.get(CommonKeyName.NetworkCallback).observe(lifecycleOwner, new Observer<Object>() {
|
@Override
|
public void onChanged(Object o) {
|
if (o instanceof Integer) {
|
switch ((int) o) {
|
case NetUtils.Available:
|
MyLog.i("MqttManager>>>Available");
|
isHasNet = true;
|
if (!isConnet) {
|
reconnect();
|
}
|
break;
|
case NetUtils.Lost:
|
MyLog.i("MqttManager>>>Lost");
|
isHasNet = false;
|
// disconnect();
|
break;
|
}
|
}
|
}
|
});
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 连接到 MQTT broker
|
public void connect() {
|
executorService.execute(new Runnable() {
|
@Override
|
public void run() {
|
try {
|
Log.d("MqttManager", "connect开始连接isHasNet:" + isHasNet);
|
if (isHasNet) {
|
Log.d("MqttManager", "connect开始连接");
|
mqttClient.connect(connectOptions);
|
if (mqttClient.isConnected()) {
|
Log.d("MqttManager", "connect连接成功");
|
subscribeToTopic();
|
} else {
|
Log.d("MqttManager", "connect连接失败");
|
}
|
} else {
|
reconnect();
|
}
|
|
} catch (MqttException e) {
|
reconnect();
|
Log.e("MqttManager", "Error connecting to MQTT broker", e);
|
}
|
}
|
});
|
}
|
|
// 订阅主题
|
private void subscribeToTopic() {
|
try {
|
isConnet = true;
|
mqttClient.subscribe(TOPIC, (topic, message) -> {
|
// 在子线程收到消息时的处理逻辑
|
Log.d("MqttManager", "subscribe收到消息:" + new String(message.getPayload()));
|
//传递MQ收到的信息
|
HashMap<String, Object> data= MyJsonParser.getMapFromJson(new String(message.getPayload()));
|
//判断是否是当前用户
|
if (data.get("inspectorId").equals(MyApplication.myApplication.userId)){
|
LiveEventBus.get(CommonKeyName.MQTTData).post(message.getPayload());
|
}
|
});
|
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 发布消息
|
public void publishMessage(String message) {
|
try {
|
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
|
mqttClient.publish(TOPIC, mqttMessage);
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 断开连接
|
public void disconnect() {
|
MyLog.d("MqttManager>>>>关闭连接");
|
try {
|
if (mqttClient != null && mqttClient.isConnected()) {
|
mqttClient.disconnect();
|
MyLog.d("MqttManager>>>>关闭连接成功");
|
}
|
} catch (MqttException e) {
|
e.printStackTrace();
|
} finally {
|
executorService.shutdown();
|
}
|
}
|
|
// 自动重连方法
|
private void reconnect() {
|
executorService.execute(() -> {
|
int reconnectAttempts = 0;
|
while (!mqttClient.isConnected() && isHasNet) {
|
try {
|
MyLog.d("MqttManager>>>>开始重连");
|
mqttClient.connect(connectOptions);
|
if (mqttClient.isConnected()) {
|
MyLog.d("MqttManager>>>连接成功");
|
subscribeToTopic();
|
break;
|
}
|
// 指数退避
|
long backoff = Math.min((1L << reconnectAttempts) * 1000, 30000);
|
Thread.sleep(backoff);
|
reconnectAttempts++;
|
} catch (MqttException e) {
|
MyLog.e("MqttManager>>>Reconnection failed" + e.getMessage());
|
// 可以在这里增加重连尝试次数的限制
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt(); // 保留中断状态
|
return;
|
}
|
}
|
});
|
}
|
}
|