管灌系统巡查员智能手机App
zuoxiao
2025-01-18 b3e7f379e72111f55a28c302804702ee7c66bfa2
app/src/main/java/com/dayu/pipirrapp/net/MqttManager.java
@@ -1,13 +1,18 @@
package com.dayu.pipirrapp.net;
import android.content.Context;
import android.net.ConnectivityManager;
import android.net.Network;
import android.net.NetworkCapabilities;
import android.net.NetworkRequest;
import android.content.pm.PackageInfo;
import android.content.pm.PackageManager;
import android.util.Log;
import androidx.lifecycle.LifecycleOwner;
import androidx.lifecycle.Observer;
import com.dayu.pipirrapp.MyApplication;
import com.dayu.pipirrapp.utils.CommonKeyName;
import com.dayu.pipirrapp.utils.MyJsonParser;
import com.dayu.pipirrapp.utils.MyLog;
import com.dayu.pipirrapp.utils.NetUtils;
import com.jeremyliao.liveeventbus.LiveEventBus;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@@ -18,6 +23,8 @@
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -30,17 +37,28 @@
 */
public class MqttManager {
    private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址
    private static final String CLIENT_ID = "mqttx_54052fa0";
    //    private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址
    private static final String MQTT_BROKER_URL = "tcp://192.168.10.52:1883";
    private String CLIENT_ID = "mqttx_a7a9fe73";
    private static final String TOPIC = "workOrder"; // 订阅的主题
    private ConnectivityManager connectivityManager;
    private MqttClient mqttClient;
    private MqttConnectOptions connectOptions;
    boolean isHasNet = false;
    boolean isHasNet = true;
    //是否连接成功过一次,没有的话联网后重连
    boolean isConnet = false;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    public MqttManager(Context context) {
    public MqttManager(Context context, LifecycleOwner lifecycleOwner) {
        try {
            PackageManager manager = context.getPackageManager();
            PackageInfo info = null;
            try {
                info = manager.getPackageInfo(context.getPackageName(), 0);
            } catch (PackageManager.NameNotFoundException e) {
                throw new RuntimeException(e);
            }
            CLIENT_ID = context.getPackageName() + UUID.randomUUID().toString().replace("-", "") + "_" + info;
            mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence());
            connectOptions = new MqttConnectOptions();
            connectOptions.setUserName("mqtt_yjy");
@@ -48,8 +66,52 @@
            connectOptions.setCleanSession(false);
            connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间
            connectOptions.setAutomaticReconnect(true);  // 启用自动重连
            connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
            checkNetwork();
            connectOptions.setConnectionTimeout(30);// 设置连接超时时间,单位为秒
            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    Log.d("MqttManager", "连接丢失:" + cause.getMessage());
                    // 处理连接丢失,可以尝试重新连接
                    reconnect();
                }
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    Log.d("MqttManager", "messageArrived收到的消:" + new String(message.getPayload()));
                    // 处理收到的消息
                }
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    // 处理消息发送完成
                    try {
                        Log.d("MqttManager", "发送完成:" + new String(token.getMessage().toString()));
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            });
            LiveEventBus.get(CommonKeyName.NetworkCallback).observe(lifecycleOwner, new Observer<Object>() {
                @Override
                public void onChanged(Object o) {
                    if (o instanceof Integer) {
                        switch ((int) o) {
                            case NetUtils.Available:
                                MyLog.i("MqttManager>>>Available");
                                isHasNet = true;
                                if (!isConnet) {
                                    reconnect();
                                }
                                break;
                            case NetUtils.Lost:
                                MyLog.i("MqttManager>>>Lost");
                                isHasNet = false;
//                                disconnect();
                                break;
                        }
                    }
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
@@ -57,16 +119,19 @@
    // 连接到 MQTT broker
    public void connect() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Log.d("MqttManager", "connect开始连接isHasNet:" + isHasNet);
                    if (isHasNet) {
                        Log.d("MqttManager", "connect开始连接");
                        mqttClient.connect(connectOptions);
                        if (mqttClient.isConnected()) {
                            Log.d("MqttManager", "Connected to MQTT broker");
                            Log.d("MqttManager", "connect连接成功");
                            subscribeToTopic();
                        } else {
                            Log.d("MqttManager", "connect连接失败");
                        }
                    } else {
                        reconnect();
@@ -83,36 +148,18 @@
    // 订阅主题
    private void subscribeToTopic() {
        try {
            isConnet = true;
            mqttClient.subscribe(TOPIC, (topic, message) -> {
                // 收到消息时的处理逻辑
                Log.d("MqttManager", "Received message:" + new String(message.getPayload()));
                // 在子线程收到消息时的处理逻辑
                Log.d("MqttManager", "subscribe收到消息:" + new String(message.getPayload()));
                //传递MQ收到的信息
                LiveEventBus.get(CommonKeyName.locationData).post(message.getPayload());
            });
            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    Log.d("MqttManager", "connectionLost" + cause.getMessage());
                    reconnect();
                    // 处理连接丢失,可以尝试重新连接
                }
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    Log.d("MqttManager", "Received messageArrived:" + new String(message.getPayload()));
                    // 处理收到的消息
                }
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    // 处理消息发送完成
                    try {
                        Log.d("MqttManager", "deliveryComplete:" + new String(token.getMessage().toString()));
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                HashMap<String, Object> data = MyJsonParser.getMapFromJson(new String(message.getPayload()));
                //判断是否是当前用户
                if (data.get("inspectorId").equals(MyApplication.myApplication.userId)) {
                    LiveEventBus.get(CommonKeyName.MQTTData).post(data.get("workOrderId"));
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
@@ -130,71 +177,42 @@
    // 断开连接
    public void disconnect() {
        MyLog.d("MqttManager>>>>关闭连接");
        try {
            if (mqttClient != null && mqttClient.isConnected()) {
                mqttClient.disconnect();
                System.out.println("Disconnected from MQTT broker");
                MyLog.d("MqttManager>>>>关闭连接成功");
            }
        } catch (MqttException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
    // 自动重连方法
    private void reconnect() {
        try {
            if (isHasNet) {
                while (!mqttClient.isConnected() && isHasNet) {
                    Log.d("MqttManager", "Attempting to reconnect...");
                    mqttClient.connect(connectOptions);  // 重试连接
                    if (mqttClient.isConnected()) {
                        Log.d("MqttManager", "Connected to MQTT broker");
                        subscribeToTopic();
                    }
                    Thread.sleep(5000);     // 每 5 秒重试一次
                }
                Log.d("MqttManager", "Reconnected to MQTT broker! isHasNet=true");
            } else {
                Log.d("MqttManager", "isHasNet is false");
                Thread.sleep(5000);
                reconnect();
            }
        } catch (MqttException | InterruptedException e) {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException ex) {
                e.printStackTrace();
            }// 每 5 秒重试一次
            reconnect();
        }
    }
    public void checkNetwork() {
        NetworkRequest request = new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)  // 必须具备互联网能力
                .build();
        connectivityManager.registerNetworkCallback(request, new ConnectivityManager.NetworkCallback() {
            @Override
            public void onAvailable(Network network) {
                super.onAvailable(network);
                // 网络可用时的处理逻辑
                Log.d("MqttManager", "Network is available.");
                isHasNet = true;
                reconnect();
            }
            @Override
            public void onLost(Network network) {
                super.onLost(network);
                // 网络丢失时的处理逻辑
                Log.d("MqttManager", "Network is lost.");
                isHasNet = false;
        executorService.execute(() -> {
            int reconnectAttempts = 0;
            while (!mqttClient.isConnected() && isHasNet) {
                try {
                    mqttClient.disconnect();
                    MyLog.d("MqttManager>>>>开始重连");
                    mqttClient.connect(connectOptions);
                    if (mqttClient.isConnected()) {
                        MyLog.d("MqttManager>>>连接成功");
                        subscribeToTopic();
                        break;
                    }
                    // 指数退避
                    long backoff = Math.min((1L << reconnectAttempts) * 1000, 30000);
                    Thread.sleep(backoff);
                    reconnectAttempts++;
                } catch (MqttException e) {
                    e.printStackTrace();
                    MyLog.e("MqttManager>>>Reconnection failed" + e.getMessage());
                    // 可以在这里增加重连尝试次数的限制
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // 保留中断状态
                    return;
                }
            }
        });