管灌系统巡查员智能手机App
zuoxiao
2025-01-18 b3e7f379e72111f55a28c302804702ee7c66bfa2
app/src/main/java/com/dayu/pipirrapp/net/MqttManager.java
@@ -1,10 +1,18 @@
package com.dayu.pipirrapp.net;
import android.content.Context;
import android.net.ConnectivityManager;
import android.content.pm.PackageInfo;
import android.content.pm.PackageManager;
import android.util.Log;
import androidx.lifecycle.LifecycleOwner;
import androidx.lifecycle.Observer;
import com.dayu.pipirrapp.MyApplication;
import com.dayu.pipirrapp.utils.CommonKeyName;
import com.dayu.pipirrapp.utils.MyJsonParser;
import com.dayu.pipirrapp.utils.MyLog;
import com.dayu.pipirrapp.utils.NetUtils;
import com.jeremyliao.liveeventbus.LiveEventBus;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@@ -15,6 +23,8 @@
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -27,17 +37,28 @@
 */
public class MqttManager {
    private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址
    private static final String CLIENT_ID = "mqttx_54052fa0";
    //    private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址
    private static final String MQTT_BROKER_URL = "tcp://192.168.10.52:1883";
    private String CLIENT_ID = "mqttx_a7a9fe73";
    private static final String TOPIC = "workOrder"; // 订阅的主题
    private ConnectivityManager connectivityManager;
    private MqttClient mqttClient;
    private MqttConnectOptions connectOptions;
    boolean isHasNet = false;
    boolean isHasNet = true;
    //是否连接成功过一次,没有的话联网后重连
    boolean isConnet = false;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    public MqttManager(Context context) {
    public MqttManager(Context context, LifecycleOwner lifecycleOwner) {
        try {
            PackageManager manager = context.getPackageManager();
            PackageInfo info = null;
            try {
                info = manager.getPackageInfo(context.getPackageName(), 0);
            } catch (PackageManager.NameNotFoundException e) {
                throw new RuntimeException(e);
            }
            CLIENT_ID = context.getPackageName() + UUID.randomUUID().toString().replace("-", "") + "_" + info;
            mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence());
            connectOptions = new MqttConnectOptions();
            connectOptions.setUserName("mqtt_yjy");
@@ -45,28 +66,52 @@
            connectOptions.setCleanSession(false);
            connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间
            connectOptions.setAutomaticReconnect(true);  // 启用自动重连
            connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
//            LiveEventBus.get(CommonKeyName.NetworkCallback).observeForever(new Observer<Object>() {
//                @Override
//                public void onChanged(Object o) {
//                    switch ((int) o) {
//                        case NetUtils.Available:
//                            MyLog.i("MqttManager>>>Available");
//                            isHasNet = true;
//                            reconnect();
//                            break;
//                        case NetUtils.Lost:
//                            MyLog.i("MqttManager>>>Lost");
//                            isHasNet = false;
//                            try {
//                                mqttClient.disconnect();
//                            } catch (MqttException e) {
//                                e.printStackTrace();
//                            }
//                            break;
//                    }
//                }
//            });
            connectOptions.setConnectionTimeout(30);// 设置连接超时时间,单位为秒
            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    Log.d("MqttManager", "连接丢失:" + cause.getMessage());
                    // 处理连接丢失,可以尝试重新连接
                    reconnect();
                }
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    Log.d("MqttManager", "messageArrived收到的消:" + new String(message.getPayload()));
                    // 处理收到的消息
                }
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    // 处理消息发送完成
                    try {
                        Log.d("MqttManager", "发送完成:" + new String(token.getMessage().toString()));
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            });
            LiveEventBus.get(CommonKeyName.NetworkCallback).observe(lifecycleOwner, new Observer<Object>() {
                @Override
                public void onChanged(Object o) {
                    if (o instanceof Integer) {
                        switch ((int) o) {
                            case NetUtils.Available:
                                MyLog.i("MqttManager>>>Available");
                                isHasNet = true;
                                if (!isConnet) {
                                    reconnect();
                                }
                                break;
                            case NetUtils.Lost:
                                MyLog.i("MqttManager>>>Lost");
                                isHasNet = false;
//                                disconnect();
                                break;
                        }
                    }
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
@@ -74,16 +119,19 @@
    // 连接到 MQTT broker
    public void connect() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Log.d("MqttManager", "connect开始连接isHasNet:" + isHasNet);
                    if (isHasNet) {
                        Log.d("MqttManager", "connect开始连接");
                        mqttClient.connect(connectOptions);
                        if (mqttClient.isConnected()) {
                            Log.d("MqttManager", "Connected to MQTT broker");
                            Log.d("MqttManager", "connect连接成功");
                            subscribeToTopic();
                        } else {
                            Log.d("MqttManager", "connect连接失败");
                        }
                    } else {
                        reconnect();
@@ -100,36 +148,18 @@
    // 订阅主题
    private void subscribeToTopic() {
        try {
            isConnet = true;
            mqttClient.subscribe(TOPIC, (topic, message) -> {
                // 收到消息时的处理逻辑
                Log.d("MqttManager", "Received message:" + new String(message.getPayload()));
                // 在子线程收到消息时的处理逻辑
                Log.d("MqttManager", "subscribe收到消息:" + new String(message.getPayload()));
                //传递MQ收到的信息
                LiveEventBus.get(CommonKeyName.locationData).post(message.getPayload());
            });
            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    Log.d("MqttManager", "connectionLost" + cause.getMessage());
                    reconnect();
                    // 处理连接丢失,可以尝试重新连接
                }
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    Log.d("MqttManager", "Received messageArrived:" + new String(message.getPayload()));
                    // 处理收到的消息
                }
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    // 处理消息发送完成
                    try {
                        Log.d("MqttManager", "deliveryComplete:" + new String(token.getMessage().toString()));
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                HashMap<String, Object> data = MyJsonParser.getMapFromJson(new String(message.getPayload()));
                //判断是否是当前用户
                if (data.get("inspectorId").equals(MyApplication.myApplication.userId)) {
                    LiveEventBus.get(CommonKeyName.MQTTData).post(data.get("workOrderId"));
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
@@ -147,44 +177,44 @@
    // 断开连接
    public void disconnect() {
        MyLog.d("MqttManager>>>>关闭连接");
        try {
            if (mqttClient != null && mqttClient.isConnected()) {
                mqttClient.disconnect();
                System.out.println("Disconnected from MQTT broker");
                MyLog.d("MqttManager>>>>关闭连接成功");
            }
        } catch (MqttException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
    // 自动重连方法
    private void reconnect() {
        try {
            if (isHasNet) {
                while (!mqttClient.isConnected() && isHasNet) {
                    Log.d("MqttManager", "Attempting to reconnect...");
                    mqttClient.connect(connectOptions);  // 重试连接
        executorService.execute(() -> {
            int reconnectAttempts = 0;
            while (!mqttClient.isConnected() && isHasNet) {
                try {
                    MyLog.d("MqttManager>>>>开始重连");
                    mqttClient.connect(connectOptions);
                    if (mqttClient.isConnected()) {
                        Log.d("MqttManager", "Connected to MQTT broker");
                        MyLog.d("MqttManager>>>连接成功");
                        subscribeToTopic();
                        break;
                    }
                    Thread.sleep(5000);     // 每 5 秒重试一次
                    // 指数退避
                    long backoff = Math.min((1L << reconnectAttempts) * 1000, 30000);
                    Thread.sleep(backoff);
                    reconnectAttempts++;
                } catch (MqttException e) {
                    MyLog.e("MqttManager>>>Reconnection failed" + e.getMessage());
                    // 可以在这里增加重连尝试次数的限制
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // 保留中断状态
                    return;
                }
                Log.d("MqttManager", "Reconnected to MQTT broker! isHasNet=true");
            } else {
                Log.d("MqttManager", "isHasNet is false");
//                Thread.sleep(5000);
//                reconnect();
            }
        } catch (MqttException | InterruptedException e) {
//            try {
//                Thread.sleep(5000);
//            } catch (InterruptedException ex) {
//                e.printStackTrace();
//            }// 每 5 秒重试一次
//            reconnect();
        }
        });
    }
}