package com.dayu.pipirrapp.net; import android.content.Context; import android.net.ConnectivityManager; import android.util.Log; import androidx.lifecycle.LifecycleOwner; import androidx.lifecycle.Observer; import com.dayu.pipirrapp.utils.CommonKeyName; import com.dayu.pipirrapp.utils.MyLog; import com.dayu.pipirrapp.utils.NetUtils; import com.jeremyliao.liveeventbus.LiveEventBus; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * MQTT相关设置 * * @author zuoxiao * @version 1.0 * @since 2024-11-26 */ public class MqttManager { private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址 private static final String CLIENT_ID = "mqttx_f62ef124"; private static final String TOPIC = "workOrder"; // 订阅的主题 private MqttClient mqttClient; private MqttConnectOptions connectOptions; boolean isHasNet = true; private ExecutorService executorService = Executors.newSingleThreadExecutor(); public MqttManager(Context context, LifecycleOwner lifecycleOwner) { try { mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence()); connectOptions = new MqttConnectOptions(); connectOptions.setUserName("mqtt_yjy"); connectOptions.setPassword("yjy".toCharArray()); connectOptions.setCleanSession(false); connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间 connectOptions.setAutomaticReconnect(true); // 启用自动重连 LiveEventBus.get(CommonKeyName.NetworkCallback).observe(lifecycleOwner, new Observer() { @Override public void onChanged(Object o) { if (o instanceof Integer) { switch ((int) o) { case NetUtils.Available: MyLog.i("MqttManager>>>Available"); isHasNet = true; reconnect(); break; case NetUtils.Lost: MyLog.i("MqttManager>>>Lost"); isHasNet = false; disconnect(); break; } } } }); } catch (MqttException e) { e.printStackTrace(); } } // 连接到 MQTT broker public void connect() { executorService.execute(new Runnable() { @Override public void run() { try { Log.d("MqttManager", "connect开始连接isHasNet:" + isHasNet); if (isHasNet) { Log.d("MqttManager", "connect开始连接"); mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { Log.d("MqttManager", "连接丢失:" + cause.getMessage()); // 处理连接丢失,可以尝试重新连接 reconnect(); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { Log.d("MqttManager", "收到的消:" + new String(message.getPayload())); // 处理收到的消息 } @Override public void deliveryComplete(IMqttDeliveryToken token) { // 处理消息发送完成 try { Log.d("MqttManager", "发送完成:" + new String(token.getMessage().toString())); } catch (MqttException e) { e.printStackTrace(); } } }); mqttClient.connect(connectOptions); if (mqttClient.isConnected()) { Log.d("MqttManager", "connect连接成功"); subscribeToTopic(); } else { Log.d("MqttManager", "connect连接失败"); } } else { reconnect(); } } catch (MqttException e) { reconnect(); Log.e("MqttManager", "Error connecting to MQTT broker", e); } } }); } // 订阅主题 private void subscribeToTopic() { try { mqttClient.subscribe(TOPIC, (topic, message) -> { // 收到消息时的处理逻辑 Log.d("MqttManager", "收到消息:" + new String(message.getPayload())); //传递MQ收到的信息 LiveEventBus.get(CommonKeyName.locationData).post(message.getPayload()); }); } catch (MqttException e) { e.printStackTrace(); } } // 发布消息 public void publishMessage(String message) { try { MqttMessage mqttMessage = new MqttMessage(message.getBytes()); mqttClient.publish(TOPIC, mqttMessage); } catch (MqttException e) { e.printStackTrace(); } } // 断开连接 public void disconnect() { MyLog.d("MqttManager>>>>关闭连接"); try { if (mqttClient != null && mqttClient.isConnected()) { mqttClient.disconnect(); MyLog.d("MqttManager>>>>关闭连接成功"); } } catch (MqttException e) { e.printStackTrace(); } } // 自动重连方法 private void reconnect() { // executorService.execute(() -> { // try { // MyLog.d("MqttManager>>>>开始重连+isHasNet:" + isHasNet + ">>>mqttClient.isConnected():" + mqttClient.isConnected()); // while (!mqttClient.isConnected() && isHasNet) { // MyLog.d("MqttManager>>>>开始连接"); // mqttClient.connect(connectOptions); // if (mqttClient.isConnected()) { // MyLog.d("MqttManager>>>连接成功"); // subscribeToTopic(); // break; // } // Thread.sleep(5000); // 每5秒重试一次 // } // } catch (MqttException | InterruptedException e) { // MyLog.e("MqttManager>>>Reconnection failed" + e.toString()); // } // }); } }