package com.dayu.pipirrapp.net;
|
|
import android.content.Context;
|
import android.net.ConnectivityManager;
|
import android.util.Log;
|
|
import androidx.lifecycle.LifecycleOwner;
|
import androidx.lifecycle.Observer;
|
|
import com.dayu.pipirrapp.utils.CommonKeyName;
|
import com.dayu.pipirrapp.utils.MyLog;
|
import com.dayu.pipirrapp.utils.NetUtils;
|
import com.jeremyliao.liveeventbus.LiveEventBus;
|
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
|
/**
|
* MQTT相关设置
|
*
|
* @author zuoxiao
|
* @version 1.0
|
* @since 2024-11-26
|
*/
|
public class MqttManager {
|
|
private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址
|
private static final String CLIENT_ID = "mqttx_f62ef124";
|
private static final String TOPIC = "workOrder"; // 订阅的主题
|
|
private MqttClient mqttClient;
|
private MqttConnectOptions connectOptions;
|
boolean isHasNet = true;
|
private ExecutorService executorService = Executors.newSingleThreadExecutor();
|
|
public MqttManager(Context context, LifecycleOwner lifecycleOwner) {
|
try {
|
mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence());
|
connectOptions = new MqttConnectOptions();
|
connectOptions.setUserName("mqtt_yjy");
|
connectOptions.setPassword("yjy".toCharArray());
|
connectOptions.setCleanSession(false);
|
connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间
|
connectOptions.setAutomaticReconnect(true); // 启用自动重连
|
LiveEventBus.get(CommonKeyName.NetworkCallback).observe(lifecycleOwner, new Observer<Object>() {
|
@Override
|
public void onChanged(Object o) {
|
if (o instanceof Integer) {
|
switch ((int) o) {
|
case NetUtils.Available:
|
MyLog.i("MqttManager>>>Available");
|
isHasNet = true;
|
reconnect();
|
break;
|
case NetUtils.Lost:
|
MyLog.i("MqttManager>>>Lost");
|
isHasNet = false;
|
disconnect();
|
break;
|
}
|
}
|
}
|
});
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 连接到 MQTT broker
|
public void connect() {
|
executorService.execute(new Runnable() {
|
@Override
|
public void run() {
|
try {
|
Log.d("MqttManager", "connect开始连接isHasNet:" + isHasNet);
|
if (isHasNet) {
|
Log.d("MqttManager", "connect开始连接");
|
mqttClient.setCallback(new MqttCallback() {
|
@Override
|
public void connectionLost(Throwable cause) {
|
Log.d("MqttManager", "连接丢失:" + cause.getMessage());
|
// 处理连接丢失,可以尝试重新连接
|
reconnect();
|
}
|
|
@Override
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
Log.d("MqttManager", "收到的消:" + new String(message.getPayload()));
|
// 处理收到的消息
|
}
|
|
@Override
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
// 处理消息发送完成
|
try {
|
Log.d("MqttManager", "发送完成:" + new String(token.getMessage().toString()));
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
});
|
mqttClient.connect(connectOptions);
|
if (mqttClient.isConnected()) {
|
Log.d("MqttManager", "connect连接成功");
|
subscribeToTopic();
|
} else {
|
Log.d("MqttManager", "connect连接失败");
|
}
|
} else {
|
reconnect();
|
}
|
|
} catch (MqttException e) {
|
reconnect();
|
Log.e("MqttManager", "Error connecting to MQTT broker", e);
|
}
|
}
|
});
|
}
|
|
// 订阅主题
|
private void subscribeToTopic() {
|
try {
|
mqttClient.subscribe(TOPIC, (topic, message) -> {
|
// 收到消息时的处理逻辑
|
Log.d("MqttManager", "收到消息:" + new String(message.getPayload()));
|
//传递MQ收到的信息
|
LiveEventBus.get(CommonKeyName.locationData).post(message.getPayload());
|
});
|
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 发布消息
|
public void publishMessage(String message) {
|
try {
|
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
|
mqttClient.publish(TOPIC, mqttMessage);
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 断开连接
|
public void disconnect() {
|
MyLog.d("MqttManager>>>>关闭连接");
|
try {
|
if (mqttClient != null && mqttClient.isConnected()) {
|
mqttClient.disconnect();
|
MyLog.d("MqttManager>>>>关闭连接成功");
|
}
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// 自动重连方法
|
private void reconnect() {
|
// executorService.execute(() -> {
|
// try {
|
// MyLog.d("MqttManager>>>>开始重连+isHasNet:" + isHasNet + ">>>mqttClient.isConnected():" + mqttClient.isConnected());
|
// while (!mqttClient.isConnected() && isHasNet) {
|
// MyLog.d("MqttManager>>>>开始连接");
|
// mqttClient.connect(connectOptions);
|
// if (mqttClient.isConnected()) {
|
// MyLog.d("MqttManager>>>连接成功");
|
// subscribeToTopic();
|
// break;
|
// }
|
// Thread.sleep(5000); // 每5秒重试一次
|
// }
|
// } catch (MqttException | InterruptedException e) {
|
// MyLog.e("MqttManager>>>Reconnection failed" + e.toString());
|
// }
|
// });
|
}
|
}
|