package com.dayu.pipirrapp.net; import android.content.Context; import android.net.ConnectivityManager; import android.net.Network; import android.net.NetworkCapabilities; import android.net.NetworkRequest; import android.util.Log; import com.dayu.pipirrapp.utils.CommonKeyName; import com.jeremyliao.liveeventbus.LiveEventBus; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * MQTT相关设置 * * @author zuoxiao * @version 1.0 * @since 2024-11-26 */ public class MqttManager { private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址 private static final String CLIENT_ID = "mqttx_54052fa0"; private static final String TOPIC = "workOrder"; // 订阅的主题 private ConnectivityManager connectivityManager; private MqttClient mqttClient; private MqttConnectOptions connectOptions; boolean isHasNet = false; public MqttManager(Context context) { try { mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence()); connectOptions = new MqttConnectOptions(); connectOptions.setUserName("mqtt_yjy"); connectOptions.setPassword("yjy".toCharArray()); connectOptions.setCleanSession(false); connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间 connectOptions.setAutomaticReconnect(true); // 启用自动重连 connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE); checkNetwork(); } catch (MqttException e) { e.printStackTrace(); } } // 连接到 MQTT broker public void connect() { ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(new Runnable() { @Override public void run() { try { if (isHasNet) { mqttClient.connect(connectOptions); if (mqttClient.isConnected()) { Log.d("MqttManager", "Connected to MQTT broker"); subscribeToTopic(); } } else { reconnect(); } } catch (MqttException e) { reconnect(); Log.e("MqttManager", "Error connecting to MQTT broker", e); } } }); } // 订阅主题 private void subscribeToTopic() { try { mqttClient.subscribe(TOPIC, (topic, message) -> { // 收到消息时的处理逻辑 Log.d("MqttManager", "Received message:" + new String(message.getPayload())); //传递MQ收到的信息 LiveEventBus.get(CommonKeyName.locationData).post(message.getPayload()); }); mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { Log.d("MqttManager", "connectionLost" + cause.getMessage()); reconnect(); // 处理连接丢失,可以尝试重新连接 } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { Log.d("MqttManager", "Received messageArrived:" + new String(message.getPayload())); // 处理收到的消息 } @Override public void deliveryComplete(IMqttDeliveryToken token) { // 处理消息发送完成 try { Log.d("MqttManager", "deliveryComplete:" + new String(token.getMessage().toString())); } catch (MqttException e) { e.printStackTrace(); } } }); } catch (MqttException e) { e.printStackTrace(); } } // 发布消息 public void publishMessage(String message) { try { MqttMessage mqttMessage = new MqttMessage(message.getBytes()); mqttClient.publish(TOPIC, mqttMessage); } catch (MqttException e) { e.printStackTrace(); } } // 断开连接 public void disconnect() { try { if (mqttClient != null && mqttClient.isConnected()) { mqttClient.disconnect(); System.out.println("Disconnected from MQTT broker"); } } catch (MqttException e) { e.printStackTrace(); } } // 自动重连方法 private void reconnect() { try { if (isHasNet) { while (!mqttClient.isConnected() && isHasNet) { Log.d("MqttManager", "Attempting to reconnect..."); mqttClient.connect(connectOptions); // 重试连接 if (mqttClient.isConnected()) { Log.d("MqttManager", "Connected to MQTT broker"); subscribeToTopic(); } Thread.sleep(5000); // 每 5 秒重试一次 } Log.d("MqttManager", "Reconnected to MQTT broker! isHasNet=true"); } else { Log.d("MqttManager", "isHasNet is false"); Thread.sleep(5000); reconnect(); } } catch (MqttException | InterruptedException e) { try { Thread.sleep(5000); } catch (InterruptedException ex) { e.printStackTrace(); }// 每 5 秒重试一次 reconnect(); } } public void checkNetwork() { NetworkRequest request = new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) // 必须具备互联网能力 .build(); connectivityManager.registerNetworkCallback(request, new ConnectivityManager.NetworkCallback() { @Override public void onAvailable(Network network) { super.onAvailable(network); // 网络可用时的处理逻辑 Log.d("MqttManager", "Network is available."); isHasNet = true; reconnect(); } @Override public void onLost(Network network) { super.onLost(network); // 网络丢失时的处理逻辑 Log.d("MqttManager", "Network is lost."); isHasNet = false; try { mqttClient.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } }); } }