管灌系统巡查员智能手机App
zuoxiao
2024-12-09 fbb64f0b4151a4fefb90191991aa0fba7b26c52a
app/src/main/java/com/dayu/pipirrapp/net/MqttManager.java
@@ -2,12 +2,14 @@
import android.content.Context;
import android.net.ConnectivityManager;
import android.net.Network;
import android.net.NetworkCapabilities;
import android.net.NetworkRequest;
import android.util.Log;
import androidx.lifecycle.LifecycleOwner;
import androidx.lifecycle.Observer;
import com.dayu.pipirrapp.utils.CommonKeyName;
import com.dayu.pipirrapp.utils.MyLog;
import com.dayu.pipirrapp.utils.NetUtils;
import com.jeremyliao.liveeventbus.LiveEventBus;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@@ -31,15 +33,15 @@
public class MqttManager {
    private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址
    private static final String CLIENT_ID = "mqttx_54052fa0";
    private static final String CLIENT_ID = "mqttx_f62ef124";
    private static final String TOPIC = "workOrder"; // 订阅的主题
    private ConnectivityManager connectivityManager;
    private MqttClient mqttClient;
    private MqttConnectOptions connectOptions;
    boolean isHasNet = false;
    boolean isHasNet = true;
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
    public MqttManager(Context context) {
    public MqttManager(Context context, LifecycleOwner lifecycleOwner) {
        try {
            mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence());
            connectOptions = new MqttConnectOptions();
@@ -48,8 +50,25 @@
            connectOptions.setCleanSession(false);
            connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间
            connectOptions.setAutomaticReconnect(true);  // 启用自动重连
            connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
            checkNetwork();
            LiveEventBus.get(CommonKeyName.NetworkCallback).observe(lifecycleOwner, new Observer<Object>() {
                @Override
                public void onChanged(Object o) {
                    if (o instanceof Integer) {
                        switch ((int) o) {
                            case NetUtils.Available:
                                MyLog.i("MqttManager>>>Available");
                                isHasNet = true;
                                reconnect();
                                break;
                            case NetUtils.Lost:
                                MyLog.i("MqttManager>>>Lost");
                                isHasNet = false;
                                disconnect();
                                break;
                        }
                    }
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
@@ -57,16 +76,43 @@
    // 连接到 MQTT broker
    public void connect() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Log.d("MqttManager", "connect开始连接isHasNet:" + isHasNet);
                    if (isHasNet) {
                        Log.d("MqttManager", "connect开始连接");
                        mqttClient.setCallback(new MqttCallback() {
                            @Override
                            public void connectionLost(Throwable cause) {
                                Log.d("MqttManager", "连接丢失:" + cause.getMessage());
                                // 处理连接丢失,可以尝试重新连接
                                reconnect();
                            }
                            @Override
                            public void messageArrived(String topic, MqttMessage message) throws Exception {
                                Log.d("MqttManager", "收到的消:" + new String(message.getPayload()));
                                // 处理收到的消息
                            }
                            @Override
                            public void deliveryComplete(IMqttDeliveryToken token) {
                                // 处理消息发送完成
                                try {
                                    Log.d("MqttManager", "发送完成:" + new String(token.getMessage().toString()));
                                } catch (MqttException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                        mqttClient.connect(connectOptions);
                        if (mqttClient.isConnected()) {
                            Log.d("MqttManager", "Connected to MQTT broker");
                            Log.d("MqttManager", "connect连接成功");
                            subscribeToTopic();
                        } else {
                            Log.d("MqttManager", "connect连接失败");
                        }
                    } else {
                        reconnect();
@@ -85,34 +131,11 @@
        try {
            mqttClient.subscribe(TOPIC, (topic, message) -> {
                // 收到消息时的处理逻辑
                Log.d("MqttManager", "Received message:" + new String(message.getPayload()));
                Log.d("MqttManager", "收到消息:" + new String(message.getPayload()));
                //传递MQ收到的信息
                LiveEventBus.get(CommonKeyName.locationData).post(message.getPayload());
            });
            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    Log.d("MqttManager", "connectionLost" + cause.getMessage());
                    reconnect();
                    // 处理连接丢失,可以尝试重新连接
                }
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    Log.d("MqttManager", "Received messageArrived:" + new String(message.getPayload()));
                    // 处理收到的消息
                }
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    // 处理消息发送完成
                    try {
                        Log.d("MqttManager", "deliveryComplete:" + new String(token.getMessage().toString()));
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
@@ -130,10 +153,11 @@
    // 断开连接
    public void disconnect() {
        MyLog.d("MqttManager>>>>关闭连接");
        try {
            if (mqttClient != null && mqttClient.isConnected()) {
                mqttClient.disconnect();
                System.out.println("Disconnected from MQTT broker");
                MyLog.d("MqttManager>>>>关闭连接成功");
            }
        } catch (MqttException e) {
            e.printStackTrace();
@@ -142,61 +166,22 @@
    // 自动重连方法
    private void reconnect() {
        try {
            if (isHasNet) {
                while (!mqttClient.isConnected() && isHasNet) {
                    Log.d("MqttManager", "Attempting to reconnect...");
                    mqttClient.connect(connectOptions);  // 重试连接
                    if (mqttClient.isConnected()) {
                        Log.d("MqttManager", "Connected to MQTT broker");
                        subscribeToTopic();
                    }
                    Thread.sleep(5000);     // 每 5 秒重试一次
                }
                Log.d("MqttManager", "Reconnected to MQTT broker! isHasNet=true");
            } else {
                Log.d("MqttManager", "isHasNet is false");
                Thread.sleep(5000);
                reconnect();
            }
        } catch (MqttException | InterruptedException e) {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException ex) {
                e.printStackTrace();
            }// 每 5 秒重试一次
            reconnect();
        }
    }
    public void checkNetwork() {
        NetworkRequest request = new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)  // 必须具备互联网能力
                .build();
        connectivityManager.registerNetworkCallback(request, new ConnectivityManager.NetworkCallback() {
            @Override
            public void onAvailable(Network network) {
                super.onAvailable(network);
                // 网络可用时的处理逻辑
                Log.d("MqttManager", "Network is available.");
                isHasNet = true;
                reconnect();
            }
            @Override
            public void onLost(Network network) {
                super.onLost(network);
                // 网络丢失时的处理逻辑
                Log.d("MqttManager", "Network is lost.");
                isHasNet = false;
                try {
                    mqttClient.disconnect();
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        });
//        executorService.execute(() -> {
//            try {
//                MyLog.d("MqttManager>>>>开始重连+isHasNet:" + isHasNet + ">>>mqttClient.isConnected():" + mqttClient.isConnected());
//                while (!mqttClient.isConnected() && isHasNet) {
//                    MyLog.d("MqttManager>>>>开始连接");
//                    mqttClient.connect(connectOptions);
//                    if (mqttClient.isConnected()) {
//                        MyLog.d("MqttManager>>>连接成功");
//                        subscribeToTopic();
//                        break;
//                    }
//                    Thread.sleep(5000);  // 每5秒重试一次
//                }
//            } catch (MqttException | InterruptedException e) {
//                MyLog.e("MqttManager>>>Reconnection failed" + e.toString());
//            }
//        });
    }
}