package com.dayu.pipirrapp.net;
|
|
import android.content.Context;
|
import android.net.ConnectivityManager;
|
import android.util.Log;
|
|
import com.dayu.pipirrapp.utils.CommonKeyName;
|
import com.jeremyliao.liveeventbus.LiveEventBus;
|
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
|
/**
|
* MQTT相关设置
|
*
|
* @author zuoxiao
|
* @version 1.0
|
* @since 2024-11-26
|
*/
|
public class MqttManager {
|
|
private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址
|
private static final String CLIENT_ID = "mqttx_54052fa0";
|
private static final String TOPIC = "workOrder"; // 订阅的主题
|
private ConnectivityManager connectivityManager;
|
|
private MqttClient mqttClient;
|
private MqttConnectOptions connectOptions;
|
boolean isHasNet = false;
|
|
public MqttManager(Context context) {
|
try {
|
mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence());
|
connectOptions = new MqttConnectOptions();
|
connectOptions.setUserName("mqtt_yjy");
|
connectOptions.setPassword("yjy".toCharArray());
|
connectOptions.setCleanSession(false);
|
connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间
|
connectOptions.setAutomaticReconnect(true); // 启用自动重连
|
connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
|
// LiveEventBus.get(CommonKeyName.NetworkCallback).observeForever(new Observer<Object>() {
|
// @Override
|
// public void onChanged(Object o) {
|
// switch ((int) o) {
|
// case NetUtils.Available:
|
// MyLog.i("MqttManager>>>Available");
|
// isHasNet = true;
|
// reconnect();
|
// break;
|
// case NetUtils.Lost:
|
// MyLog.i("MqttManager>>>Lost");
|
// isHasNet = false;
|
// try {
|
// mqttClient.disconnect();
|
// } catch (MqttException e) {
|
// e.printStackTrace();
|
// }
|
// break;
|
// }
|
// }
|
// });
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 连接到 MQTT broker
|
public void connect() {
|
ExecutorService executorService = Executors.newSingleThreadExecutor();
|
executorService.execute(new Runnable() {
|
@Override
|
public void run() {
|
try {
|
if (isHasNet) {
|
mqttClient.connect(connectOptions);
|
if (mqttClient.isConnected()) {
|
Log.d("MqttManager", "Connected to MQTT broker");
|
subscribeToTopic();
|
}
|
} else {
|
reconnect();
|
}
|
|
} catch (MqttException e) {
|
reconnect();
|
Log.e("MqttManager", "Error connecting to MQTT broker", e);
|
}
|
}
|
});
|
}
|
|
// 订阅主题
|
private void subscribeToTopic() {
|
try {
|
mqttClient.subscribe(TOPIC, (topic, message) -> {
|
// 收到消息时的处理逻辑
|
Log.d("MqttManager", "Received message:" + new String(message.getPayload()));
|
//传递MQ收到的信息
|
LiveEventBus.get(CommonKeyName.locationData).post(message.getPayload());
|
});
|
mqttClient.setCallback(new MqttCallback() {
|
@Override
|
public void connectionLost(Throwable cause) {
|
Log.d("MqttManager", "connectionLost" + cause.getMessage());
|
reconnect();
|
// 处理连接丢失,可以尝试重新连接
|
}
|
|
@Override
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
Log.d("MqttManager", "Received messageArrived:" + new String(message.getPayload()));
|
// 处理收到的消息
|
}
|
|
@Override
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
// 处理消息发送完成
|
try {
|
Log.d("MqttManager", "deliveryComplete:" + new String(token.getMessage().toString()));
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
});
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 发布消息
|
public void publishMessage(String message) {
|
try {
|
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
|
mqttClient.publish(TOPIC, mqttMessage);
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 断开连接
|
public void disconnect() {
|
try {
|
if (mqttClient != null && mqttClient.isConnected()) {
|
mqttClient.disconnect();
|
System.out.println("Disconnected from MQTT broker");
|
}
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 自动重连方法
|
private void reconnect() {
|
try {
|
if (isHasNet) {
|
while (!mqttClient.isConnected() && isHasNet) {
|
Log.d("MqttManager", "Attempting to reconnect...");
|
mqttClient.connect(connectOptions); // 重试连接
|
if (mqttClient.isConnected()) {
|
Log.d("MqttManager", "Connected to MQTT broker");
|
subscribeToTopic();
|
}
|
Thread.sleep(5000); // 每 5 秒重试一次
|
}
|
Log.d("MqttManager", "Reconnected to MQTT broker! isHasNet=true");
|
} else {
|
Log.d("MqttManager", "isHasNet is false");
|
// Thread.sleep(5000);
|
// reconnect();
|
}
|
|
} catch (MqttException | InterruptedException e) {
|
// try {
|
// Thread.sleep(5000);
|
// } catch (InterruptedException ex) {
|
// e.printStackTrace();
|
// }// 每 5 秒重试一次
|
// reconnect();
|
|
}
|
}
|
}
|