| | |
| | | package com.dayu.pipirrapp.net; |
| | | |
| | | import android.content.Context; |
| | | import android.content.pm.PackageInfo; |
| | | import android.content.pm.PackageManager; |
| | | import android.util.Log; |
| | | |
| | | import androidx.lifecycle.LifecycleOwner; |
| | | import androidx.lifecycle.Observer; |
| | | |
| | | import com.dayu.pipirrapp.MyApplication; |
| | | import com.dayu.pipirrapp.utils.CommonKeyName; |
| | | import com.dayu.pipirrapp.utils.MyJsonParser; |
| | | import com.dayu.pipirrapp.utils.MyLog; |
| | | import com.dayu.pipirrapp.utils.NetUtils; |
| | | import com.jeremyliao.liveeventbus.LiveEventBus; |
| | |
| | | import org.eclipse.paho.client.mqttv3.MqttMessage; |
| | | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.UUID; |
| | | import java.util.concurrent.ExecutorService; |
| | | import java.util.concurrent.Executors; |
| | | |
| | |
| | | */ |
| | | public class MqttManager { |
| | | |
| | | // private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址 |
| | | private static final String MQTT_BROKER_URL = "tcp://192.168.10.52:1883"; |
| | | private static final String CLIENT_ID = "mqttx_a7a9fe73"; |
| | | // private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址 |
| | | private static final String MQTT_BROKER_URL = "tcp://192.168.10.52:1883"; |
| | | private String CLIENT_ID = "mqttx_a7a9fe73"; |
| | | private static final String TOPIC = "workOrder"; // 订阅的主题 |
| | | |
| | | private MqttClient mqttClient; |
| | | private MqttConnectOptions connectOptions; |
| | | boolean isHasNet = true; |
| | | private ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| | | //是否连接成功过一次,没有的话联网后重连 |
| | | boolean isConnet = false; |
| | | private final ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| | | |
| | | public MqttManager(Context context, LifecycleOwner lifecycleOwner) { |
| | | try { |
| | | PackageManager manager = context.getPackageManager(); |
| | | PackageInfo info = null; |
| | | try { |
| | | info = manager.getPackageInfo(context.getPackageName(), 0); |
| | | } catch (PackageManager.NameNotFoundException e) { |
| | | throw new RuntimeException(e); |
| | | } |
| | | CLIENT_ID = context.getPackageName() + UUID.randomUUID().toString().replace("-", "") + "_" + info; |
| | | mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence()); |
| | | connectOptions = new MqttConnectOptions(); |
| | | connectOptions.setUserName("mqtt_yjy"); |
| | |
| | | connectOptions.setCleanSession(false); |
| | | connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间 |
| | | connectOptions.setAutomaticReconnect(true); // 启用自动重连 |
| | | connectOptions.setConnectionTimeout(30);// 设置连接超时时间,单位为秒 |
| | | mqttClient.setCallback(new MqttCallback() { |
| | | @Override |
| | | public void connectionLost(Throwable cause) { |
| | | Log.d("MqttManager", "连接丢失:" + cause.getMessage()); |
| | | // 处理连接丢失,可以尝试重新连接 |
| | | reconnect(); |
| | | } |
| | | |
| | | @Override |
| | | public void messageArrived(String topic, MqttMessage message) throws Exception { |
| | | Log.d("MqttManager", "messageArrived收到的消:" + new String(message.getPayload())); |
| | | // 处理收到的消息 |
| | | } |
| | | |
| | | @Override |
| | | public void deliveryComplete(IMqttDeliveryToken token) { |
| | | // 处理消息发送完成 |
| | | try { |
| | | Log.d("MqttManager", "发送完成:" + new String(token.getMessage().toString())); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | }); |
| | | LiveEventBus.get(CommonKeyName.NetworkCallback).observe(lifecycleOwner, new Observer<Object>() { |
| | | @Override |
| | | public void onChanged(Object o) { |
| | |
| | | case NetUtils.Available: |
| | | MyLog.i("MqttManager>>>Available"); |
| | | isHasNet = true; |
| | | reconnect(); |
| | | if (!isConnet) { |
| | | reconnect(); |
| | | } |
| | | break; |
| | | case NetUtils.Lost: |
| | | MyLog.i("MqttManager>>>Lost"); |
| | | isHasNet = false; |
| | | disconnect(); |
| | | // disconnect(); |
| | | break; |
| | | } |
| | | } |
| | |
| | | Log.d("MqttManager", "connect开始连接isHasNet:" + isHasNet); |
| | | if (isHasNet) { |
| | | Log.d("MqttManager", "connect开始连接"); |
| | | mqttClient.setCallback(new MqttCallback() { |
| | | @Override |
| | | public void connectionLost(Throwable cause) { |
| | | Log.d("MqttManager", "连接丢失:" + cause.getMessage()); |
| | | // 处理连接丢失,可以尝试重新连接 |
| | | reconnect(); |
| | | } |
| | | |
| | | @Override |
| | | public void messageArrived(String topic, MqttMessage message) throws Exception { |
| | | Log.d("MqttManager", "收到的消:" + new String(message.getPayload())); |
| | | // 处理收到的消息 |
| | | } |
| | | |
| | | @Override |
| | | public void deliveryComplete(IMqttDeliveryToken token) { |
| | | // 处理消息发送完成 |
| | | try { |
| | | Log.d("MqttManager", "发送完成:" + new String(token.getMessage().toString())); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | }); |
| | | mqttClient.connect(connectOptions); |
| | | if (mqttClient.isConnected()) { |
| | | Log.d("MqttManager", "connect连接成功"); |
| | |
| | | // 订阅主题 |
| | | private void subscribeToTopic() { |
| | | try { |
| | | isConnet = true; |
| | | mqttClient.subscribe(TOPIC, (topic, message) -> { |
| | | // 收到消息时的处理逻辑 |
| | | Log.d("MqttManager", "收到消息:" + new String(message.getPayload())); |
| | | // 在子线程收到消息时的处理逻辑 |
| | | Log.d("MqttManager", "subscribe收到消息:" + new String(message.getPayload())); |
| | | //传递MQ收到的信息 |
| | | LiveEventBus.get(CommonKeyName.locationData).post(message.getPayload()); |
| | | HashMap<String, Object> data = MyJsonParser.getMapFromJson(new String(message.getPayload())); |
| | | //判断是否是当前用户 |
| | | if (data.get("inspectorId").equals(MyApplication.myApplication.userId)) { |
| | | LiveEventBus.get(CommonKeyName.MQTTData).post(data.get("workOrderId")); |
| | | } |
| | | }); |
| | | |
| | | } catch (MqttException e) { |
| | |
| | | } |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } finally { |
| | | executorService.shutdown(); |
| | | } |
| | | } |
| | | |
| | | // 自动重连方法 |
| | | private void reconnect() { |
| | | // executorService.execute(() -> { |
| | | // try { |
| | | // MyLog.d("MqttManager>>>>开始重连+isHasNet:" + isHasNet + ">>>mqttClient.isConnected():" + mqttClient.isConnected()); |
| | | // while (!mqttClient.isConnected() && isHasNet) { |
| | | // MyLog.d("MqttManager>>>>开始连接"); |
| | | // mqttClient.connect(connectOptions); |
| | | // if (mqttClient.isConnected()) { |
| | | // MyLog.d("MqttManager>>>连接成功"); |
| | | // subscribeToTopic(); |
| | | // break; |
| | | // } |
| | | // Thread.sleep(5000); // 每5秒重试一次 |
| | | // } |
| | | // } catch (MqttException | InterruptedException e) { |
| | | // MyLog.e("MqttManager>>>Reconnection failed" + e.toString()); |
| | | // } |
| | | // }); |
| | | executorService.execute(() -> { |
| | | int reconnectAttempts = 0; |
| | | while (!mqttClient.isConnected() && isHasNet) { |
| | | try { |
| | | MyLog.d("MqttManager>>>>开始重连"); |
| | | mqttClient.connect(connectOptions); |
| | | if (mqttClient.isConnected()) { |
| | | MyLog.d("MqttManager>>>连接成功"); |
| | | subscribeToTopic(); |
| | | break; |
| | | } |
| | | // 指数退避 |
| | | long backoff = Math.min((1L << reconnectAttempts) * 1000, 30000); |
| | | Thread.sleep(backoff); |
| | | reconnectAttempts++; |
| | | } catch (MqttException e) { |
| | | MyLog.e("MqttManager>>>Reconnection failed" + e.getMessage()); |
| | | // 可以在这里增加重连尝试次数的限制 |
| | | } catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); // 保留中断状态 |
| | | return; |
| | | } |
| | | } |
| | | }); |
| | | } |
| | | } |