| | |
| | | package com.dayu.pipirrapp.net; |
| | | |
| | | import android.content.Context; |
| | | import android.net.ConnectivityManager; |
| | | import android.content.pm.PackageInfo; |
| | | import android.content.pm.PackageManager; |
| | | import android.util.Log; |
| | | |
| | | import androidx.lifecycle.LifecycleOwner; |
| | | import androidx.lifecycle.Observer; |
| | | |
| | | import com.dayu.pipirrapp.MyApplication; |
| | | import com.dayu.pipirrapp.utils.CommonKeyName; |
| | | import com.dayu.pipirrapp.utils.MyJsonParser; |
| | | import com.dayu.pipirrapp.utils.MyLog; |
| | | import com.dayu.pipirrapp.utils.NetUtils; |
| | | import com.jeremyliao.liveeventbus.LiveEventBus; |
| | | |
| | | import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; |
| | |
| | | import org.eclipse.paho.client.mqttv3.MqttMessage; |
| | | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.UUID; |
| | | import java.util.concurrent.ExecutorService; |
| | | import java.util.concurrent.Executors; |
| | | |
| | |
| | | */ |
| | | public class MqttManager { |
| | | |
| | | private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址 |
| | | private static final String CLIENT_ID = "mqttx_54052fa0"; |
| | | // private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址 |
| | | private static final String MQTT_BROKER_URL = "tcp://192.168.10.52:1883"; |
| | | private String CLIENT_ID = "mqttx_a7a9fe73"; |
| | | private static final String TOPIC = "workOrder"; // 订阅的主题 |
| | | private ConnectivityManager connectivityManager; |
| | | |
| | | private MqttClient mqttClient; |
| | | private MqttConnectOptions connectOptions; |
| | | boolean isHasNet = false; |
| | | boolean isHasNet = true; |
| | | //是否连接成功过一次,没有的话联网后重连 |
| | | boolean isConnet = false; |
| | | private final ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| | | |
| | | public MqttManager(Context context) { |
| | | public MqttManager(Context context, LifecycleOwner lifecycleOwner) { |
| | | try { |
| | | PackageManager manager = context.getPackageManager(); |
| | | PackageInfo info = null; |
| | | try { |
| | | info = manager.getPackageInfo(context.getPackageName(), 0); |
| | | } catch (PackageManager.NameNotFoundException e) { |
| | | throw new RuntimeException(e); |
| | | } |
| | | CLIENT_ID = context.getPackageName() + UUID.randomUUID().toString().replace("-", "") + "_" + info; |
| | | mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence()); |
| | | connectOptions = new MqttConnectOptions(); |
| | | connectOptions.setUserName("mqtt_yjy"); |
| | |
| | | connectOptions.setCleanSession(false); |
| | | connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间 |
| | | connectOptions.setAutomaticReconnect(true); // 启用自动重连 |
| | | connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE); |
| | | // LiveEventBus.get(CommonKeyName.NetworkCallback).observeForever(new Observer<Object>() { |
| | | // @Override |
| | | // public void onChanged(Object o) { |
| | | // switch ((int) o) { |
| | | // case NetUtils.Available: |
| | | // MyLog.i("MqttManager>>>Available"); |
| | | // isHasNet = true; |
| | | // reconnect(); |
| | | // break; |
| | | // case NetUtils.Lost: |
| | | // MyLog.i("MqttManager>>>Lost"); |
| | | // isHasNet = false; |
| | | // try { |
| | | // mqttClient.disconnect(); |
| | | // } catch (MqttException e) { |
| | | // e.printStackTrace(); |
| | | // } |
| | | // break; |
| | | // } |
| | | // } |
| | | // }); |
| | | connectOptions.setConnectionTimeout(30);// 设置连接超时时间,单位为秒 |
| | | mqttClient.setCallback(new MqttCallback() { |
| | | @Override |
| | | public void connectionLost(Throwable cause) { |
| | | Log.d("MqttManager", "连接丢失:" + cause.getMessage()); |
| | | // 处理连接丢失,可以尝试重新连接 |
| | | reconnect(); |
| | | } |
| | | |
| | | @Override |
| | | public void messageArrived(String topic, MqttMessage message) throws Exception { |
| | | Log.d("MqttManager", "messageArrived收到的消:" + new String(message.getPayload())); |
| | | // 处理收到的消息 |
| | | } |
| | | |
| | | @Override |
| | | public void deliveryComplete(IMqttDeliveryToken token) { |
| | | // 处理消息发送完成 |
| | | try { |
| | | Log.d("MqttManager", "发送完成:" + new String(token.getMessage().toString())); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | }); |
| | | LiveEventBus.get(CommonKeyName.NetworkCallback).observe(lifecycleOwner, new Observer<Object>() { |
| | | @Override |
| | | public void onChanged(Object o) { |
| | | if (o instanceof Integer) { |
| | | switch ((int) o) { |
| | | case NetUtils.Available: |
| | | MyLog.i("MqttManager>>>Available"); |
| | | isHasNet = true; |
| | | if (!isConnet) { |
| | | reconnect(); |
| | | } |
| | | break; |
| | | case NetUtils.Lost: |
| | | MyLog.i("MqttManager>>>Lost"); |
| | | isHasNet = false; |
| | | // disconnect(); |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | }); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | |
| | | |
| | | // 连接到 MQTT broker |
| | | public void connect() { |
| | | ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| | | executorService.execute(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | try { |
| | | Log.d("MqttManager", "connect开始连接isHasNet:" + isHasNet); |
| | | if (isHasNet) { |
| | | Log.d("MqttManager", "connect开始连接"); |
| | | mqttClient.connect(connectOptions); |
| | | if (mqttClient.isConnected()) { |
| | | Log.d("MqttManager", "Connected to MQTT broker"); |
| | | Log.d("MqttManager", "connect连接成功"); |
| | | subscribeToTopic(); |
| | | } else { |
| | | Log.d("MqttManager", "connect连接失败"); |
| | | } |
| | | } else { |
| | | reconnect(); |
| | |
| | | // 订阅主题 |
| | | private void subscribeToTopic() { |
| | | try { |
| | | isConnet = true; |
| | | mqttClient.subscribe(TOPIC, (topic, message) -> { |
| | | // 收到消息时的处理逻辑 |
| | | Log.d("MqttManager", "Received message:" + new String(message.getPayload())); |
| | | // 在子线程收到消息时的处理逻辑 |
| | | Log.d("MqttManager", "subscribe收到消息:" + new String(message.getPayload())); |
| | | //传递MQ收到的信息 |
| | | LiveEventBus.get(CommonKeyName.locationData).post(message.getPayload()); |
| | | }); |
| | | mqttClient.setCallback(new MqttCallback() { |
| | | @Override |
| | | public void connectionLost(Throwable cause) { |
| | | Log.d("MqttManager", "connectionLost" + cause.getMessage()); |
| | | reconnect(); |
| | | // 处理连接丢失,可以尝试重新连接 |
| | | } |
| | | |
| | | @Override |
| | | public void messageArrived(String topic, MqttMessage message) throws Exception { |
| | | Log.d("MqttManager", "Received messageArrived:" + new String(message.getPayload())); |
| | | // 处理收到的消息 |
| | | } |
| | | |
| | | @Override |
| | | public void deliveryComplete(IMqttDeliveryToken token) { |
| | | // 处理消息发送完成 |
| | | try { |
| | | Log.d("MqttManager", "deliveryComplete:" + new String(token.getMessage().toString())); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | HashMap<String, Object> data = MyJsonParser.getMapFromJson(new String(message.getPayload())); |
| | | //判断是否是当前用户 |
| | | if (data.get("inspectorId").equals(MyApplication.myApplication.userId)) { |
| | | LiveEventBus.get(CommonKeyName.MQTTData).post(data.get("workOrderId")); |
| | | } |
| | | }); |
| | | |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | |
| | | |
| | | // 断开连接 |
| | | public void disconnect() { |
| | | MyLog.d("MqttManager>>>>关闭连接"); |
| | | try { |
| | | if (mqttClient != null && mqttClient.isConnected()) { |
| | | mqttClient.disconnect(); |
| | | System.out.println("Disconnected from MQTT broker"); |
| | | MyLog.d("MqttManager>>>>关闭连接成功"); |
| | | } |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } finally { |
| | | executorService.shutdown(); |
| | | } |
| | | } |
| | | |
| | | // 自动重连方法 |
| | | private void reconnect() { |
| | | try { |
| | | if (isHasNet) { |
| | | while (!mqttClient.isConnected() && isHasNet) { |
| | | Log.d("MqttManager", "Attempting to reconnect..."); |
| | | mqttClient.connect(connectOptions); // 重试连接 |
| | | executorService.execute(() -> { |
| | | int reconnectAttempts = 0; |
| | | while (!mqttClient.isConnected() && isHasNet) { |
| | | try { |
| | | MyLog.d("MqttManager>>>>开始重连"); |
| | | mqttClient.connect(connectOptions); |
| | | if (mqttClient.isConnected()) { |
| | | Log.d("MqttManager", "Connected to MQTT broker"); |
| | | MyLog.d("MqttManager>>>连接成功"); |
| | | subscribeToTopic(); |
| | | break; |
| | | } |
| | | Thread.sleep(5000); // 每 5 秒重试一次 |
| | | // 指数退避 |
| | | long backoff = Math.min((1L << reconnectAttempts) * 1000, 30000); |
| | | Thread.sleep(backoff); |
| | | reconnectAttempts++; |
| | | } catch (MqttException e) { |
| | | MyLog.e("MqttManager>>>Reconnection failed" + e.getMessage()); |
| | | // 可以在这里增加重连尝试次数的限制 |
| | | } catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); // 保留中断状态 |
| | | return; |
| | | } |
| | | Log.d("MqttManager", "Reconnected to MQTT broker! isHasNet=true"); |
| | | } else { |
| | | Log.d("MqttManager", "isHasNet is false"); |
| | | // Thread.sleep(5000); |
| | | // reconnect(); |
| | | } |
| | | |
| | | } catch (MqttException | InterruptedException e) { |
| | | // try { |
| | | // Thread.sleep(5000); |
| | | // } catch (InterruptedException ex) { |
| | | // e.printStackTrace(); |
| | | // }// 每 5 秒重试一次 |
| | | // reconnect(); |
| | | |
| | | } |
| | | }); |
| | | } |
| | | } |