From 15f5680b532238290d0adf095a93e5af1c5f1203 Mon Sep 17 00:00:00 2001 From: zuoxiao <470321431@qq.com> Date: 星期五, 07 二月 2025 17:14:57 +0800 Subject: [PATCH] 1.添加显示隐藏取水口、分水房功能 2.完善图例自定义控件功能和显示 3.处理工单添加选择时间功能 --- app/src/main/java/com/dayu/pipirrapp/net/MqttManager.java | 188 +++++++++++++++++++++++++++------------------- 1 files changed, 109 insertions(+), 79 deletions(-) diff --git a/app/src/main/java/com/dayu/pipirrapp/net/MqttManager.java b/app/src/main/java/com/dayu/pipirrapp/net/MqttManager.java index dbea4cd..3902e86 100644 --- a/app/src/main/java/com/dayu/pipirrapp/net/MqttManager.java +++ b/app/src/main/java/com/dayu/pipirrapp/net/MqttManager.java @@ -1,10 +1,18 @@ package com.dayu.pipirrapp.net; import android.content.Context; -import android.net.ConnectivityManager; +import android.content.pm.PackageInfo; +import android.content.pm.PackageManager; import android.util.Log; +import androidx.lifecycle.LifecycleOwner; +import androidx.lifecycle.Observer; + +import com.dayu.pipirrapp.MyApplication; import com.dayu.pipirrapp.utils.CommonKeyName; +import com.dayu.pipirrapp.utils.MyJsonParser; +import com.dayu.pipirrapp.utils.MyLog; +import com.dayu.pipirrapp.utils.NetUtils; import com.jeremyliao.liveeventbus.LiveEventBus; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; @@ -15,6 +23,8 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import java.util.HashMap; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -27,17 +37,28 @@ */ public class MqttManager { - private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 淇敼涓轰綘鐨� broker 鍦板潃 - private static final String CLIENT_ID = "mqttx_54052fa0"; + // private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 淇敼涓轰綘鐨� broker 鍦板潃 + private static final String MQTT_BROKER_URL = "tcp://192.168.10.52:1883"; + private String CLIENT_ID = "mqttx_a7a9fe73"; private static final String TOPIC = "workOrder"; // 璁㈤槄鐨勪富棰� - private ConnectivityManager connectivityManager; private MqttClient mqttClient; private MqttConnectOptions connectOptions; - boolean isHasNet = false; + boolean isHasNet = true; + //鏄惁杩炴帴鎴愬姛杩囦竴娆★紝娌℃湁鐨勮瘽鑱旂綉鍚庨噸杩� + boolean isConnet = false; + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); - public MqttManager(Context context) { + public MqttManager(Context context, LifecycleOwner lifecycleOwner) { try { + PackageManager manager = context.getPackageManager(); + PackageInfo info = null; + try { + info = manager.getPackageInfo(context.getPackageName(), 0); + } catch (PackageManager.NameNotFoundException e) { + throw new RuntimeException(e); + } + CLIENT_ID = context.getPackageName() + UUID.randomUUID().toString().replace("-", "") + "_" + info; mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence()); connectOptions = new MqttConnectOptions(); connectOptions.setUserName("mqtt_yjy"); @@ -45,28 +66,52 @@ connectOptions.setCleanSession(false); connectOptions.setKeepAliveInterval(60); // 璁剧疆淇濇寔杩炴帴鐨勬椂闂� connectOptions.setAutomaticReconnect(true); // 鍚敤鑷姩閲嶈繛 - connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE); -// LiveEventBus.get(CommonKeyName.NetworkCallback).observeForever(new Observer<Object>() { -// @Override -// public void onChanged(Object o) { -// switch ((int) o) { -// case NetUtils.Available: -// MyLog.i("MqttManager>>>Available"); -// isHasNet = true; -// reconnect(); -// break; -// case NetUtils.Lost: -// MyLog.i("MqttManager>>>Lost"); -// isHasNet = false; -// try { -// mqttClient.disconnect(); -// } catch (MqttException e) { -// e.printStackTrace(); -// } -// break; -// } -// } -// }); + connectOptions.setConnectionTimeout(30);// 璁剧疆杩炴帴瓒呮椂鏃堕棿锛屽崟浣嶄负绉� + mqttClient.setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable cause) { + Log.d("MqttManager", "杩炴帴涓㈠け锛�" + cause.getMessage()); + // 澶勭悊杩炴帴涓㈠け锛屽彲浠ュ皾璇曢噸鏂拌繛鎺� + reconnect(); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + Log.d("MqttManager", "messageArrived鏀跺埌鐨勬秷:" + new String(message.getPayload())); + // 澶勭悊鏀跺埌鐨勬秷鎭� + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + // 澶勭悊娑堟伅鍙戦�佸畬鎴� + try { + Log.d("MqttManager", "鍙戦�佸畬鎴愶細" + new String(token.getMessage().toString())); + } catch (MqttException e) { + e.printStackTrace(); + } + } + }); + LiveEventBus.get(CommonKeyName.NetworkCallback).observe(lifecycleOwner, new Observer<Object>() { + @Override + public void onChanged(Object o) { + if (o instanceof Integer) { + switch ((int) o) { + case NetUtils.Available: + MyLog.i("MqttManager>>>Available"); + isHasNet = true; + if (!isConnet) { + reconnect(); + } + break; + case NetUtils.Lost: + MyLog.i("MqttManager>>>Lost"); + isHasNet = false; +// disconnect(); + break; + } + } + } + }); } catch (MqttException e) { e.printStackTrace(); } @@ -74,16 +119,19 @@ // 杩炴帴鍒� MQTT broker public void connect() { - ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(new Runnable() { @Override public void run() { try { + Log.d("MqttManager", "connect寮�濮嬭繛鎺sHasNet:" + isHasNet); if (isHasNet) { + Log.d("MqttManager", "connect寮�濮嬭繛鎺�"); mqttClient.connect(connectOptions); if (mqttClient.isConnected()) { - Log.d("MqttManager", "Connected to MQTT broker"); + Log.d("MqttManager", "connect杩炴帴鎴愬姛"); subscribeToTopic(); + } else { + Log.d("MqttManager", "connect杩炴帴澶辫触"); } } else { reconnect(); @@ -100,36 +148,18 @@ // 璁㈤槄涓婚 private void subscribeToTopic() { try { + isConnet = true; mqttClient.subscribe(TOPIC, (topic, message) -> { - // 鏀跺埌娑堟伅鏃剁殑澶勭悊閫昏緫 - Log.d("MqttManager", "Received message:" + new String(message.getPayload())); + // 鍦ㄥ瓙绾跨▼鏀跺埌娑堟伅鏃剁殑澶勭悊閫昏緫 + Log.d("MqttManager", "subscribe鏀跺埌娑堟伅锛�" + new String(message.getPayload())); //浼犻�扢Q鏀跺埌鐨勪俊鎭� - LiveEventBus.get(CommonKeyName.locationData).post(message.getPayload()); - }); - mqttClient.setCallback(new MqttCallback() { - @Override - public void connectionLost(Throwable cause) { - Log.d("MqttManager", "connectionLost" + cause.getMessage()); - reconnect(); - // 澶勭悊杩炴帴涓㈠け锛屽彲浠ュ皾璇曢噸鏂拌繛鎺� - } - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - Log.d("MqttManager", "Received messageArrived:" + new String(message.getPayload())); - // 澶勭悊鏀跺埌鐨勬秷鎭� - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - // 澶勭悊娑堟伅鍙戦�佸畬鎴� - try { - Log.d("MqttManager", "deliveryComplete锛�" + new String(token.getMessage().toString())); - } catch (MqttException e) { - e.printStackTrace(); - } + HashMap<String, Object> data = MyJsonParser.getMapFromJson(new String(message.getPayload())); + //鍒ゆ柇鏄惁鏄綋鍓嶇敤鎴� + if (data.get("inspectorId").equals(MyApplication.myApplication.userId)) { + LiveEventBus.get(CommonKeyName.MQTTData).post(data.get("workOrderId")); } }); + } catch (MqttException e) { e.printStackTrace(); } @@ -147,44 +177,44 @@ // 鏂紑杩炴帴 public void disconnect() { + MyLog.d("MqttManager>>>>鍏抽棴杩炴帴"); try { if (mqttClient != null && mqttClient.isConnected()) { mqttClient.disconnect(); - System.out.println("Disconnected from MQTT broker"); + MyLog.d("MqttManager>>>>鍏抽棴杩炴帴鎴愬姛"); } } catch (MqttException e) { e.printStackTrace(); + } finally { + executorService.shutdown(); } } // 鑷姩閲嶈繛鏂规硶 private void reconnect() { - try { - if (isHasNet) { - while (!mqttClient.isConnected() && isHasNet) { - Log.d("MqttManager", "Attempting to reconnect..."); - mqttClient.connect(connectOptions); // 閲嶈瘯杩炴帴 + executorService.execute(() -> { + int reconnectAttempts = 0; + while (!mqttClient.isConnected() && isHasNet) { + try { + MyLog.d("MqttManager>>>>寮�濮嬮噸杩�"); + mqttClient.connect(connectOptions); if (mqttClient.isConnected()) { - Log.d("MqttManager", "Connected to MQTT broker"); + MyLog.d("MqttManager>>>杩炴帴鎴愬姛"); subscribeToTopic(); + break; } - Thread.sleep(5000); // 姣� 5 绉掗噸璇曚竴娆� + // 鎸囨暟閫�閬� + long backoff = Math.min((1L << reconnectAttempts) * 1000, 30000); + Thread.sleep(backoff); + reconnectAttempts++; + } catch (MqttException e) { + MyLog.e("MqttManager>>>Reconnection failed" + e.getMessage()); + // 鍙互鍦ㄨ繖閲屽鍔犻噸杩炲皾璇曟鏁扮殑闄愬埗 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // 淇濈暀涓柇鐘舵�� + return; } - Log.d("MqttManager", "Reconnected to MQTT broker! isHasNet=true"); - } else { - Log.d("MqttManager", "isHasNet is false"); -// Thread.sleep(5000); -// reconnect(); } - - } catch (MqttException | InterruptedException e) { -// try { -// Thread.sleep(5000); -// } catch (InterruptedException ex) { -// e.printStackTrace(); -// }// 姣� 5 绉掗噸璇曚竴娆� -// reconnect(); - - } + }); } } -- Gitblit v1.8.0