管灌系统巡查员智能手机App
zuoxiao
2025-01-18 b3e7f379e72111f55a28c302804702ee7c66bfa2
app/src/main/java/com/dayu/pipirrapp/net/MqttManager.java
@@ -1,12 +1,16 @@
package com.dayu.pipirrapp.net;
import android.content.Context;
import android.content.pm.PackageInfo;
import android.content.pm.PackageManager;
import android.util.Log;
import androidx.lifecycle.LifecycleOwner;
import androidx.lifecycle.Observer;
import com.dayu.pipirrapp.MyApplication;
import com.dayu.pipirrapp.utils.CommonKeyName;
import com.dayu.pipirrapp.utils.MyJsonParser;
import com.dayu.pipirrapp.utils.MyLog;
import com.dayu.pipirrapp.utils.NetUtils;
import com.jeremyliao.liveeventbus.LiveEventBus;
@@ -19,6 +23,8 @@
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -31,18 +37,28 @@
 */
public class MqttManager {
//    private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址
private static final String MQTT_BROKER_URL = "tcp://192.168.10.52:1883";
    private static final String CLIENT_ID = "mqttx_a7a9fe73";
    //    private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址
    private static final String MQTT_BROKER_URL = "tcp://192.168.10.52:1883";
    private String CLIENT_ID = "mqttx_a7a9fe73";
    private static final String TOPIC = "workOrder"; // 订阅的主题
    private MqttClient mqttClient;
    private MqttConnectOptions connectOptions;
    boolean isHasNet = true;
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
    //是否连接成功过一次,没有的话联网后重连
    boolean isConnet = false;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    public MqttManager(Context context, LifecycleOwner lifecycleOwner) {
        try {
            PackageManager manager = context.getPackageManager();
            PackageInfo info = null;
            try {
                info = manager.getPackageInfo(context.getPackageName(), 0);
            } catch (PackageManager.NameNotFoundException e) {
                throw new RuntimeException(e);
            }
            CLIENT_ID = context.getPackageName() + UUID.randomUUID().toString().replace("-", "") + "_" + info;
            mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence());
            connectOptions = new MqttConnectOptions();
            connectOptions.setUserName("mqtt_yjy");
@@ -50,6 +66,31 @@
            connectOptions.setCleanSession(false);
            connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间
            connectOptions.setAutomaticReconnect(true);  // 启用自动重连
            connectOptions.setConnectionTimeout(30);// 设置连接超时时间,单位为秒
            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    Log.d("MqttManager", "连接丢失:" + cause.getMessage());
                    // 处理连接丢失,可以尝试重新连接
                    reconnect();
                }
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    Log.d("MqttManager", "messageArrived收到的消:" + new String(message.getPayload()));
                    // 处理收到的消息
                }
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    // 处理消息发送完成
                    try {
                        Log.d("MqttManager", "发送完成:" + new String(token.getMessage().toString()));
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            });
            LiveEventBus.get(CommonKeyName.NetworkCallback).observe(lifecycleOwner, new Observer<Object>() {
                @Override
                public void onChanged(Object o) {
@@ -58,12 +99,14 @@
                            case NetUtils.Available:
                                MyLog.i("MqttManager>>>Available");
                                isHasNet = true;
                                reconnect();
                                if (!isConnet) {
                                    reconnect();
                                }
                                break;
                            case NetUtils.Lost:
                                MyLog.i("MqttManager>>>Lost");
                                isHasNet = false;
                                disconnect();
//                                disconnect();
                                break;
                        }
                    }
@@ -83,30 +126,6 @@
                    Log.d("MqttManager", "connect开始连接isHasNet:" + isHasNet);
                    if (isHasNet) {
                        Log.d("MqttManager", "connect开始连接");
                        mqttClient.setCallback(new MqttCallback() {
                            @Override
                            public void connectionLost(Throwable cause) {
                                Log.d("MqttManager", "连接丢失:" + cause.getMessage());
                                // 处理连接丢失,可以尝试重新连接
                                reconnect();
                            }
                            @Override
                            public void messageArrived(String topic, MqttMessage message) throws Exception {
                                Log.d("MqttManager", "收到的消:" + new String(message.getPayload()));
                                // 处理收到的消息
                            }
                            @Override
                            public void deliveryComplete(IMqttDeliveryToken token) {
                                // 处理消息发送完成
                                try {
                                    Log.d("MqttManager", "发送完成:" + new String(token.getMessage().toString()));
                                } catch (MqttException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                        mqttClient.connect(connectOptions);
                        if (mqttClient.isConnected()) {
                            Log.d("MqttManager", "connect连接成功");
@@ -129,11 +148,16 @@
    // 订阅主题
    private void subscribeToTopic() {
        try {
            isConnet = true;
            mqttClient.subscribe(TOPIC, (topic, message) -> {
                // 收到消息时的处理逻辑
                Log.d("MqttManager", "收到消息:" + new String(message.getPayload()));
                // 在子线程收到消息时的处理逻辑
                Log.d("MqttManager", "subscribe收到消息:" + new String(message.getPayload()));
                //传递MQ收到的信息
                LiveEventBus.get(CommonKeyName.locationData).post(message.getPayload());
                HashMap<String, Object> data = MyJsonParser.getMapFromJson(new String(message.getPayload()));
                //判断是否是当前用户
                if (data.get("inspectorId").equals(MyApplication.myApplication.userId)) {
                    LiveEventBus.get(CommonKeyName.MQTTData).post(data.get("workOrderId"));
                }
            });
        } catch (MqttException e) {
@@ -161,27 +185,36 @@
            }
        } catch (MqttException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
    // 自动重连方法
    private void reconnect() {
//        executorService.execute(() -> {
//            try {
//                MyLog.d("MqttManager>>>>开始重连+isHasNet:" + isHasNet + ">>>mqttClient.isConnected():" + mqttClient.isConnected());
//                while (!mqttClient.isConnected() && isHasNet) {
//                    MyLog.d("MqttManager>>>>开始连接");
//                    mqttClient.connect(connectOptions);
//                    if (mqttClient.isConnected()) {
//                        MyLog.d("MqttManager>>>连接成功");
//                        subscribeToTopic();
//                        break;
//                    }
//                    Thread.sleep(5000);  // 每5秒重试一次
//                }
//            } catch (MqttException | InterruptedException e) {
//                MyLog.e("MqttManager>>>Reconnection failed" + e.toString());
//            }
//        });
        executorService.execute(() -> {
            int reconnectAttempts = 0;
            while (!mqttClient.isConnected() && isHasNet) {
                try {
                    MyLog.d("MqttManager>>>>开始重连");
                    mqttClient.connect(connectOptions);
                    if (mqttClient.isConnected()) {
                        MyLog.d("MqttManager>>>连接成功");
                        subscribeToTopic();
                        break;
                    }
                    // 指数退避
                    long backoff = Math.min((1L << reconnectAttempts) * 1000, 30000);
                    Thread.sleep(backoff);
                    reconnectAttempts++;
                } catch (MqttException e) {
                    MyLog.e("MqttManager>>>Reconnection failed" + e.getMessage());
                    // 可以在这里增加重连尝试次数的限制
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // 保留中断状态
                    return;
                }
            }
        });
    }
}