From 15f5680b532238290d0adf095a93e5af1c5f1203 Mon Sep 17 00:00:00 2001
From: zuoxiao <470321431@qq.com>
Date: 星期五, 07 二月 2025 17:14:57 +0800
Subject: [PATCH] 1.添加显示隐藏取水口、分水房功能 2.完善图例自定义控件功能和显示 3.处理工单添加选择时间功能

---
 app/src/main/java/com/dayu/pipirrapp/net/MqttManager.java |  200 +++++++++++++++++++++++++++----------------------
 1 files changed, 109 insertions(+), 91 deletions(-)

diff --git a/app/src/main/java/com/dayu/pipirrapp/net/MqttManager.java b/app/src/main/java/com/dayu/pipirrapp/net/MqttManager.java
index 7a85cc5..3902e86 100644
--- a/app/src/main/java/com/dayu/pipirrapp/net/MqttManager.java
+++ b/app/src/main/java/com/dayu/pipirrapp/net/MqttManager.java
@@ -1,13 +1,18 @@
 package com.dayu.pipirrapp.net;
 
 import android.content.Context;
-import android.net.ConnectivityManager;
-import android.net.Network;
-import android.net.NetworkCapabilities;
-import android.net.NetworkRequest;
+import android.content.pm.PackageInfo;
+import android.content.pm.PackageManager;
 import android.util.Log;
 
+import androidx.lifecycle.LifecycleOwner;
+import androidx.lifecycle.Observer;
+
+import com.dayu.pipirrapp.MyApplication;
 import com.dayu.pipirrapp.utils.CommonKeyName;
+import com.dayu.pipirrapp.utils.MyJsonParser;
+import com.dayu.pipirrapp.utils.MyLog;
+import com.dayu.pipirrapp.utils.NetUtils;
 import com.jeremyliao.liveeventbus.LiveEventBus;
 
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@@ -18,6 +23,8 @@
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
+import java.util.HashMap;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -30,17 +37,28 @@
  */
 public class MqttManager {
 
-    private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 淇敼涓轰綘鐨� broker 鍦板潃
-    private static final String CLIENT_ID = "mqttx_54052fa0";
+    //    private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 淇敼涓轰綘鐨� broker 鍦板潃
+    private static final String MQTT_BROKER_URL = "tcp://192.168.10.52:1883";
+    private String CLIENT_ID = "mqttx_a7a9fe73";
     private static final String TOPIC = "workOrder"; // 璁㈤槄鐨勪富棰�
-    private ConnectivityManager connectivityManager;
 
     private MqttClient mqttClient;
     private MqttConnectOptions connectOptions;
-    boolean isHasNet = false;
+    boolean isHasNet = true;
+    //鏄惁杩炴帴鎴愬姛杩囦竴娆★紝娌℃湁鐨勮瘽鑱旂綉鍚庨噸杩�
+    boolean isConnet = false;
+    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
 
-    public MqttManager(Context context) {
+    public MqttManager(Context context, LifecycleOwner lifecycleOwner) {
         try {
+            PackageManager manager = context.getPackageManager();
+            PackageInfo info = null;
+            try {
+                info = manager.getPackageInfo(context.getPackageName(), 0);
+            } catch (PackageManager.NameNotFoundException e) {
+                throw new RuntimeException(e);
+            }
+            CLIENT_ID = context.getPackageName() + UUID.randomUUID().toString().replace("-", "") + "_" + info;
             mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence());
             connectOptions = new MqttConnectOptions();
             connectOptions.setUserName("mqtt_yjy");
@@ -48,8 +66,52 @@
             connectOptions.setCleanSession(false);
             connectOptions.setKeepAliveInterval(60); // 璁剧疆淇濇寔杩炴帴鐨勬椂闂�
             connectOptions.setAutomaticReconnect(true);  // 鍚敤鑷姩閲嶈繛
-            connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
-            checkNetwork();
+            connectOptions.setConnectionTimeout(30);// 璁剧疆杩炴帴瓒呮椂鏃堕棿锛屽崟浣嶄负绉�
+            mqttClient.setCallback(new MqttCallback() {
+                @Override
+                public void connectionLost(Throwable cause) {
+                    Log.d("MqttManager", "杩炴帴涓㈠け锛�" + cause.getMessage());
+                    // 澶勭悊杩炴帴涓㈠け锛屽彲浠ュ皾璇曢噸鏂拌繛鎺�
+                    reconnect();
+                }
+
+                @Override
+                public void messageArrived(String topic, MqttMessage message) throws Exception {
+                    Log.d("MqttManager", "messageArrived鏀跺埌鐨勬秷:" + new String(message.getPayload()));
+                    // 澶勭悊鏀跺埌鐨勬秷鎭�
+                }
+
+                @Override
+                public void deliveryComplete(IMqttDeliveryToken token) {
+                    // 澶勭悊娑堟伅鍙戦�佸畬鎴�
+                    try {
+                        Log.d("MqttManager", "鍙戦�佸畬鎴愶細" + new String(token.getMessage().toString()));
+                    } catch (MqttException e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+            LiveEventBus.get(CommonKeyName.NetworkCallback).observe(lifecycleOwner, new Observer<Object>() {
+                @Override
+                public void onChanged(Object o) {
+                    if (o instanceof Integer) {
+                        switch ((int) o) {
+                            case NetUtils.Available:
+                                MyLog.i("MqttManager>>>Available");
+                                isHasNet = true;
+                                if (!isConnet) {
+                                    reconnect();
+                                }
+                                break;
+                            case NetUtils.Lost:
+                                MyLog.i("MqttManager>>>Lost");
+                                isHasNet = false;
+//                                disconnect();
+                                break;
+                        }
+                    }
+                }
+            });
         } catch (MqttException e) {
             e.printStackTrace();
         }
@@ -57,16 +119,19 @@
 
     // 杩炴帴鍒� MQTT broker
     public void connect() {
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
         executorService.execute(new Runnable() {
             @Override
             public void run() {
                 try {
+                    Log.d("MqttManager", "connect寮�濮嬭繛鎺sHasNet:" + isHasNet);
                     if (isHasNet) {
+                        Log.d("MqttManager", "connect寮�濮嬭繛鎺�");
                         mqttClient.connect(connectOptions);
                         if (mqttClient.isConnected()) {
-                            Log.d("MqttManager", "Connected to MQTT broker");
+                            Log.d("MqttManager", "connect杩炴帴鎴愬姛");
                             subscribeToTopic();
+                        } else {
+                            Log.d("MqttManager", "connect杩炴帴澶辫触");
                         }
                     } else {
                         reconnect();
@@ -83,36 +148,18 @@
     // 璁㈤槄涓婚
     private void subscribeToTopic() {
         try {
+            isConnet = true;
             mqttClient.subscribe(TOPIC, (topic, message) -> {
-                // 鏀跺埌娑堟伅鏃剁殑澶勭悊閫昏緫
-                Log.d("MqttManager", "Received message:" + new String(message.getPayload()));
+                // 鍦ㄥ瓙绾跨▼鏀跺埌娑堟伅鏃剁殑澶勭悊閫昏緫
+                Log.d("MqttManager", "subscribe鏀跺埌娑堟伅锛�" + new String(message.getPayload()));
                 //浼犻�扢Q鏀跺埌鐨勪俊鎭�
-                LiveEventBus.get(CommonKeyName.locationData).post(message.getPayload());
-            });
-            mqttClient.setCallback(new MqttCallback() {
-                @Override
-                public void connectionLost(Throwable cause) {
-                    Log.d("MqttManager", "connectionLost" + cause.getMessage());
-                    reconnect();
-                    // 澶勭悊杩炴帴涓㈠け锛屽彲浠ュ皾璇曢噸鏂拌繛鎺�
-                }
-
-                @Override
-                public void messageArrived(String topic, MqttMessage message) throws Exception {
-                    Log.d("MqttManager", "Received messageArrived:" + new String(message.getPayload()));
-                    // 澶勭悊鏀跺埌鐨勬秷鎭�
-                }
-
-                @Override
-                public void deliveryComplete(IMqttDeliveryToken token) {
-                    // 澶勭悊娑堟伅鍙戦�佸畬鎴�
-                    try {
-                        Log.d("MqttManager", "deliveryComplete锛�" + new String(token.getMessage().toString()));
-                    } catch (MqttException e) {
-                        e.printStackTrace();
-                    }
+                HashMap<String, Object> data = MyJsonParser.getMapFromJson(new String(message.getPayload()));
+                //鍒ゆ柇鏄惁鏄綋鍓嶇敤鎴�
+                if (data.get("inspectorId").equals(MyApplication.myApplication.userId)) {
+                    LiveEventBus.get(CommonKeyName.MQTTData).post(data.get("workOrderId"));
                 }
             });
+
         } catch (MqttException e) {
             e.printStackTrace();
         }
@@ -130,71 +177,42 @@
 
     // 鏂紑杩炴帴
     public void disconnect() {
+        MyLog.d("MqttManager>>>>鍏抽棴杩炴帴");
         try {
             if (mqttClient != null && mqttClient.isConnected()) {
                 mqttClient.disconnect();
-                System.out.println("Disconnected from MQTT broker");
+                MyLog.d("MqttManager>>>>鍏抽棴杩炴帴鎴愬姛");
             }
         } catch (MqttException e) {
             e.printStackTrace();
+        } finally {
+            executorService.shutdown();
         }
     }
 
     // 鑷姩閲嶈繛鏂规硶
     private void reconnect() {
-        try {
-            if (isHasNet) {
-                while (!mqttClient.isConnected() && isHasNet) {
-                    Log.d("MqttManager", "Attempting to reconnect...");
-                    mqttClient.connect(connectOptions);  // 閲嶈瘯杩炴帴
-                    if (mqttClient.isConnected()) {
-                        Log.d("MqttManager", "Connected to MQTT broker");
-                        subscribeToTopic();
-                    }
-                    Thread.sleep(5000);     // 姣� 5 绉掗噸璇曚竴娆�
-                }
-                Log.d("MqttManager", "Reconnected to MQTT broker! isHasNet=true");
-            } else {
-                Log.d("MqttManager", "isHasNet is false");
-                Thread.sleep(5000);
-                reconnect();
-            }
-
-        } catch (MqttException | InterruptedException e) {
-            try {
-                Thread.sleep(5000);
-            } catch (InterruptedException ex) {
-                e.printStackTrace();
-            }// 姣� 5 绉掗噸璇曚竴娆�
-            reconnect();
-
-        }
-    }
-
-    public void checkNetwork() {
-        NetworkRequest request = new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)  // 蹇呴』鍏峰浜掕仈缃戣兘鍔�
-                .build();
-
-        connectivityManager.registerNetworkCallback(request, new ConnectivityManager.NetworkCallback() {
-            @Override
-            public void onAvailable(Network network) {
-                super.onAvailable(network);
-                // 缃戠粶鍙敤鏃剁殑澶勭悊閫昏緫
-                Log.d("MqttManager", "Network is available.");
-                isHasNet = true;
-                reconnect();
-            }
-
-            @Override
-            public void onLost(Network network) {
-                super.onLost(network);
-                // 缃戠粶涓㈠け鏃剁殑澶勭悊閫昏緫
-                Log.d("MqttManager", "Network is lost.");
-                isHasNet = false;
+        executorService.execute(() -> {
+            int reconnectAttempts = 0;
+            while (!mqttClient.isConnected() && isHasNet) {
                 try {
-                    mqttClient.disconnect();
+                    MyLog.d("MqttManager>>>>寮�濮嬮噸杩�");
+                    mqttClient.connect(connectOptions);
+                    if (mqttClient.isConnected()) {
+                        MyLog.d("MqttManager>>>杩炴帴鎴愬姛");
+                        subscribeToTopic();
+                        break;
+                    }
+                    // 鎸囨暟閫�閬�
+                    long backoff = Math.min((1L << reconnectAttempts) * 1000, 30000);
+                    Thread.sleep(backoff);
+                    reconnectAttempts++;
                 } catch (MqttException e) {
-                    e.printStackTrace();
+                    MyLog.e("MqttManager>>>Reconnection failed" + e.getMessage());
+                    // 鍙互鍦ㄨ繖閲屽鍔犻噸杩炲皾璇曟鏁扮殑闄愬埗
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt(); // 淇濈暀涓柇鐘舵��
+                    return;
                 }
             }
         });

--
Gitblit v1.8.0