package com.dayu.pipirrapp.net; import android.content.Context; import android.content.pm.PackageInfo; import android.content.pm.PackageManager; import android.util.Log; import androidx.lifecycle.LifecycleOwner; import androidx.lifecycle.Observer; import com.dayu.pipirrapp.MyApplication; import com.dayu.pipirrapp.utils.CommonKeyName; import com.dayu.pipirrapp.utils.MyJsonParser; import com.dayu.pipirrapp.utils.MyLog; import com.dayu.pipirrapp.utils.NetUtils; import com.jeremyliao.liveeventbus.LiveEventBus; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.HashMap; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * MQTT相关设置 * * @author zuoxiao * @version 1.0 * @since 2024-11-26 */ public class MqttManager { // private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址 private static final String MQTT_BROKER_URL = "tcp://192.168.10.52:1883"; private String CLIENT_ID = "mqttx_a7a9fe73"; private static final String TOPIC = "workOrder"; // 订阅的主题 private MqttClient mqttClient; private MqttConnectOptions connectOptions; boolean isHasNet = true; //是否连接成功过一次,没有的话联网后重连 boolean isConnet = false; private final ExecutorService executorService = Executors.newSingleThreadExecutor(); public MqttManager(Context context, LifecycleOwner lifecycleOwner) { try { PackageManager manager = context.getPackageManager(); PackageInfo info = null; try { info = manager.getPackageInfo(context.getPackageName(), 0); } catch (PackageManager.NameNotFoundException e) { throw new RuntimeException(e); } CLIENT_ID = context.getPackageName() + UUID.randomUUID().toString().replace("-", "") + "_" + info; mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence()); connectOptions = new MqttConnectOptions(); connectOptions.setUserName("mqtt_yjy"); connectOptions.setPassword("yjy".toCharArray()); connectOptions.setCleanSession(false); connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间 connectOptions.setAutomaticReconnect(true); // 启用自动重连 connectOptions.setConnectionTimeout(30);// 设置连接超时时间,单位为秒 mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { Log.d("MqttManager", "连接丢失:" + cause.getMessage()); // 处理连接丢失,可以尝试重新连接 reconnect(); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { Log.d("MqttManager", "messageArrived收到的消:" + new String(message.getPayload())); // 处理收到的消息 } @Override public void deliveryComplete(IMqttDeliveryToken token) { // 处理消息发送完成 try { Log.d("MqttManager", "发送完成:" + new String(token.getMessage().toString())); } catch (MqttException e) { e.printStackTrace(); } } }); LiveEventBus.get(CommonKeyName.NetworkCallback).observe(lifecycleOwner, new Observer() { @Override public void onChanged(Object o) { if (o instanceof Integer) { switch ((int) o) { case NetUtils.Available: MyLog.i("MqttManager>>>Available"); isHasNet = true; if (!isConnet) { reconnect(); } break; case NetUtils.Lost: MyLog.i("MqttManager>>>Lost"); isHasNet = false; // disconnect(); break; } } } }); } catch (MqttException e) { e.printStackTrace(); } } // 连接到 MQTT broker public void connect() { executorService.execute(new Runnable() { @Override public void run() { try { Log.d("MqttManager", "connect开始连接isHasNet:" + isHasNet); if (isHasNet) { Log.d("MqttManager", "connect开始连接"); mqttClient.connect(connectOptions); if (mqttClient.isConnected()) { Log.d("MqttManager", "connect连接成功"); subscribeToTopic(); } else { Log.d("MqttManager", "connect连接失败"); } } else { reconnect(); } } catch (MqttException e) { reconnect(); Log.e("MqttManager", "Error connecting to MQTT broker", e); } } }); } // 订阅主题 private void subscribeToTopic() { try { isConnet = true; mqttClient.subscribe(TOPIC, (topic, message) -> { // 在子线程收到消息时的处理逻辑 Log.d("MqttManager", "subscribe收到消息:" + new String(message.getPayload())); //传递MQ收到的信息 HashMap data = MyJsonParser.getMapFromJson(new String(message.getPayload())); //判断是否是当前用户 if (data.get("inspectorId").equals(MyApplication.myApplication.userId)) { LiveEventBus.get(CommonKeyName.MQTTData).post(data.get("workOrderId")); } }); } catch (MqttException e) { e.printStackTrace(); } } // 发布消息 public void publishMessage(String message) { try { MqttMessage mqttMessage = new MqttMessage(message.getBytes()); mqttClient.publish(TOPIC, mqttMessage); } catch (MqttException e) { e.printStackTrace(); } } // 断开连接 public void disconnect() { MyLog.d("MqttManager>>>>关闭连接"); try { if (mqttClient != null && mqttClient.isConnected()) { mqttClient.disconnect(); MyLog.d("MqttManager>>>>关闭连接成功"); } } catch (MqttException e) { e.printStackTrace(); } finally { executorService.shutdown(); } } // 自动重连方法 private void reconnect() { executorService.execute(() -> { int reconnectAttempts = 0; while (!mqttClient.isConnected() && isHasNet) { try { MyLog.d("MqttManager>>>>开始重连"); mqttClient.connect(connectOptions); if (mqttClient.isConnected()) { MyLog.d("MqttManager>>>连接成功"); subscribeToTopic(); break; } // 指数退避 long backoff = Math.min((1L << reconnectAttempts) * 1000, 30000); Thread.sleep(backoff); reconnectAttempts++; } catch (MqttException e) { MyLog.e("MqttManager>>>Reconnection failed" + e.getMessage()); // 可以在这里增加重连尝试次数的限制 } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 保留中断状态 return; } } }); } }