package com.dayu.pipirrapp.net;
|
|
import android.content.Context;
|
import android.net.ConnectivityManager;
|
import android.net.Network;
|
import android.net.NetworkCapabilities;
|
import android.net.NetworkRequest;
|
import android.util.Log;
|
|
import com.dayu.pipirrapp.utils.CommonKeyName;
|
import com.jeremyliao.liveeventbus.LiveEventBus;
|
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
|
/**
|
* MQTT相关设置
|
*
|
* @author zuoxiao
|
* @version 1.0
|
* @since 2024-11-26
|
*/
|
public class MqttManager {
|
|
private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址
|
private static final String CLIENT_ID = "mqttx_54052fa0";
|
private static final String TOPIC = "workOrder"; // 订阅的主题
|
private ConnectivityManager connectivityManager;
|
|
private MqttClient mqttClient;
|
private MqttConnectOptions connectOptions;
|
boolean isHasNet = false;
|
|
public MqttManager(Context context) {
|
try {
|
mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence());
|
connectOptions = new MqttConnectOptions();
|
connectOptions.setUserName("mqtt_yjy");
|
connectOptions.setPassword("yjy".toCharArray());
|
connectOptions.setCleanSession(false);
|
connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间
|
connectOptions.setAutomaticReconnect(true); // 启用自动重连
|
connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
|
checkNetwork();
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 连接到 MQTT broker
|
public void connect() {
|
ExecutorService executorService = Executors.newSingleThreadExecutor();
|
executorService.execute(new Runnable() {
|
@Override
|
public void run() {
|
try {
|
if (isHasNet) {
|
mqttClient.connect(connectOptions);
|
if (mqttClient.isConnected()) {
|
Log.d("MqttManager", "Connected to MQTT broker");
|
subscribeToTopic();
|
}
|
} else {
|
reconnect();
|
}
|
|
} catch (MqttException e) {
|
reconnect();
|
Log.e("MqttManager", "Error connecting to MQTT broker", e);
|
}
|
}
|
});
|
}
|
|
// 订阅主题
|
private void subscribeToTopic() {
|
try {
|
mqttClient.subscribe(TOPIC, (topic, message) -> {
|
// 收到消息时的处理逻辑
|
Log.d("MqttManager", "Received message:" + new String(message.getPayload()));
|
//传递MQ收到的信息
|
LiveEventBus.get(CommonKeyName.locationData).post(message.getPayload());
|
});
|
mqttClient.setCallback(new MqttCallback() {
|
@Override
|
public void connectionLost(Throwable cause) {
|
Log.d("MqttManager", "connectionLost" + cause.getMessage());
|
reconnect();
|
// 处理连接丢失,可以尝试重新连接
|
}
|
|
@Override
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
Log.d("MqttManager", "Received messageArrived:" + new String(message.getPayload()));
|
// 处理收到的消息
|
}
|
|
@Override
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
// 处理消息发送完成
|
try {
|
Log.d("MqttManager", "deliveryComplete:" + new String(token.getMessage().toString()));
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
});
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 发布消息
|
public void publishMessage(String message) {
|
try {
|
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
|
mqttClient.publish(TOPIC, mqttMessage);
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 断开连接
|
public void disconnect() {
|
try {
|
if (mqttClient != null && mqttClient.isConnected()) {
|
mqttClient.disconnect();
|
System.out.println("Disconnected from MQTT broker");
|
}
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 自动重连方法
|
private void reconnect() {
|
try {
|
if (isHasNet) {
|
while (!mqttClient.isConnected() && isHasNet) {
|
Log.d("MqttManager", "Attempting to reconnect...");
|
mqttClient.connect(connectOptions); // 重试连接
|
if (mqttClient.isConnected()) {
|
Log.d("MqttManager", "Connected to MQTT broker");
|
subscribeToTopic();
|
}
|
Thread.sleep(5000); // 每 5 秒重试一次
|
}
|
Log.d("MqttManager", "Reconnected to MQTT broker! isHasNet=true");
|
} else {
|
Log.d("MqttManager", "isHasNet is false");
|
Thread.sleep(5000);
|
reconnect();
|
}
|
|
} catch (MqttException | InterruptedException e) {
|
try {
|
Thread.sleep(5000);
|
} catch (InterruptedException ex) {
|
e.printStackTrace();
|
}// 每 5 秒重试一次
|
reconnect();
|
|
}
|
}
|
|
public void checkNetwork() {
|
NetworkRequest request = new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) // 必须具备互联网能力
|
.build();
|
|
connectivityManager.registerNetworkCallback(request, new ConnectivityManager.NetworkCallback() {
|
@Override
|
public void onAvailable(Network network) {
|
super.onAvailable(network);
|
// 网络可用时的处理逻辑
|
Log.d("MqttManager", "Network is available.");
|
isHasNet = true;
|
reconnect();
|
}
|
|
@Override
|
public void onLost(Network network) {
|
super.onLost(network);
|
// 网络丢失时的处理逻辑
|
Log.d("MqttManager", "Network is lost.");
|
isHasNet = false;
|
try {
|
mqttClient.disconnect();
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
});
|
}
|
}
|