| | |
| | | |
| | | import android.content.Context; |
| | | import android.net.ConnectivityManager; |
| | | import android.net.Network; |
| | | import android.net.NetworkCapabilities; |
| | | import android.net.NetworkRequest; |
| | | import android.util.Log; |
| | | |
| | | import androidx.lifecycle.LifecycleOwner; |
| | | import androidx.lifecycle.Observer; |
| | | |
| | | import com.dayu.pipirrapp.utils.CommonKeyName; |
| | | import com.dayu.pipirrapp.utils.MyLog; |
| | | import com.dayu.pipirrapp.utils.NetUtils; |
| | | import com.jeremyliao.liveeventbus.LiveEventBus; |
| | | |
| | | import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; |
| | |
| | | public class MqttManager { |
| | | |
| | | private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址 |
| | | private static final String CLIENT_ID = "mqttx_54052fa0"; |
| | | private static final String CLIENT_ID = "mqttx_f62ef124"; |
| | | private static final String TOPIC = "workOrder"; // 订阅的主题 |
| | | private ConnectivityManager connectivityManager; |
| | | |
| | | private MqttClient mqttClient; |
| | | private MqttConnectOptions connectOptions; |
| | | boolean isHasNet = false; |
| | | boolean isHasNet = true; |
| | | private ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| | | |
| | | public MqttManager(Context context) { |
| | | public MqttManager(Context context, LifecycleOwner lifecycleOwner) { |
| | | try { |
| | | mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence()); |
| | | connectOptions = new MqttConnectOptions(); |
| | |
| | | connectOptions.setCleanSession(false); |
| | | connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间 |
| | | connectOptions.setAutomaticReconnect(true); // 启用自动重连 |
| | | connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE); |
| | | checkNetwork(); |
| | | LiveEventBus.get(CommonKeyName.NetworkCallback).observe(lifecycleOwner, new Observer<Object>() { |
| | | @Override |
| | | public void onChanged(Object o) { |
| | | if (o instanceof Integer) { |
| | | switch ((int) o) { |
| | | case NetUtils.Available: |
| | | MyLog.i("MqttManager>>>Available"); |
| | | isHasNet = true; |
| | | reconnect(); |
| | | break; |
| | | case NetUtils.Lost: |
| | | MyLog.i("MqttManager>>>Lost"); |
| | | isHasNet = false; |
| | | disconnect(); |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | }); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | |
| | | |
| | | // 连接到 MQTT broker |
| | | public void connect() { |
| | | ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| | | executorService.execute(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | try { |
| | | Log.d("MqttManager", "connect开始连接isHasNet:" + isHasNet); |
| | | if (isHasNet) { |
| | | Log.d("MqttManager", "connect开始连接"); |
| | | mqttClient.setCallback(new MqttCallback() { |
| | | @Override |
| | | public void connectionLost(Throwable cause) { |
| | | Log.d("MqttManager", "连接丢失:" + cause.getMessage()); |
| | | // 处理连接丢失,可以尝试重新连接 |
| | | reconnect(); |
| | | } |
| | | |
| | | @Override |
| | | public void messageArrived(String topic, MqttMessage message) throws Exception { |
| | | Log.d("MqttManager", "收到的消:" + new String(message.getPayload())); |
| | | // 处理收到的消息 |
| | | } |
| | | |
| | | @Override |
| | | public void deliveryComplete(IMqttDeliveryToken token) { |
| | | // 处理消息发送完成 |
| | | try { |
| | | Log.d("MqttManager", "发送完成:" + new String(token.getMessage().toString())); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | }); |
| | | mqttClient.connect(connectOptions); |
| | | if (mqttClient.isConnected()) { |
| | | Log.d("MqttManager", "Connected to MQTT broker"); |
| | | Log.d("MqttManager", "connect连接成功"); |
| | | subscribeToTopic(); |
| | | } else { |
| | | Log.d("MqttManager", "connect连接失败"); |
| | | } |
| | | } else { |
| | | reconnect(); |
| | |
| | | try { |
| | | mqttClient.subscribe(TOPIC, (topic, message) -> { |
| | | // 收到消息时的处理逻辑 |
| | | Log.d("MqttManager", "Received message:" + new String(message.getPayload())); |
| | | Log.d("MqttManager", "收到消息:" + new String(message.getPayload())); |
| | | //传递MQ收到的信息 |
| | | LiveEventBus.get(CommonKeyName.locationData).post(message.getPayload()); |
| | | }); |
| | | mqttClient.setCallback(new MqttCallback() { |
| | | @Override |
| | | public void connectionLost(Throwable cause) { |
| | | Log.d("MqttManager", "connectionLost" + cause.getMessage()); |
| | | reconnect(); |
| | | // 处理连接丢失,可以尝试重新连接 |
| | | } |
| | | |
| | | @Override |
| | | public void messageArrived(String topic, MqttMessage message) throws Exception { |
| | | Log.d("MqttManager", "Received messageArrived:" + new String(message.getPayload())); |
| | | // 处理收到的消息 |
| | | } |
| | | |
| | | @Override |
| | | public void deliveryComplete(IMqttDeliveryToken token) { |
| | | // 处理消息发送完成 |
| | | try { |
| | | Log.d("MqttManager", "deliveryComplete:" + new String(token.getMessage().toString())); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | }); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | |
| | | |
| | | // 断开连接 |
| | | public void disconnect() { |
| | | MyLog.d("MqttManager>>>>关闭连接"); |
| | | try { |
| | | if (mqttClient != null && mqttClient.isConnected()) { |
| | | mqttClient.disconnect(); |
| | | System.out.println("Disconnected from MQTT broker"); |
| | | MyLog.d("MqttManager>>>>关闭连接成功"); |
| | | } |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | |
| | | |
| | | // 自动重连方法 |
| | | private void reconnect() { |
| | | try { |
| | | if (isHasNet) { |
| | | while (!mqttClient.isConnected() && isHasNet) { |
| | | Log.d("MqttManager", "Attempting to reconnect..."); |
| | | mqttClient.connect(connectOptions); // 重试连接 |
| | | if (mqttClient.isConnected()) { |
| | | Log.d("MqttManager", "Connected to MQTT broker"); |
| | | subscribeToTopic(); |
| | | } |
| | | Thread.sleep(5000); // 每 5 秒重试一次 |
| | | } |
| | | Log.d("MqttManager", "Reconnected to MQTT broker! isHasNet=true"); |
| | | } else { |
| | | Log.d("MqttManager", "isHasNet is false"); |
| | | Thread.sleep(5000); |
| | | reconnect(); |
| | | } |
| | | |
| | | } catch (MqttException | InterruptedException e) { |
| | | try { |
| | | Thread.sleep(5000); |
| | | } catch (InterruptedException ex) { |
| | | e.printStackTrace(); |
| | | }// 每 5 秒重试一次 |
| | | reconnect(); |
| | | |
| | | } |
| | | } |
| | | |
| | | public void checkNetwork() { |
| | | NetworkRequest request = new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) // 必须具备互联网能力 |
| | | .build(); |
| | | |
| | | connectivityManager.registerNetworkCallback(request, new ConnectivityManager.NetworkCallback() { |
| | | @Override |
| | | public void onAvailable(Network network) { |
| | | super.onAvailable(network); |
| | | // 网络可用时的处理逻辑 |
| | | Log.d("MqttManager", "Network is available."); |
| | | isHasNet = true; |
| | | reconnect(); |
| | | } |
| | | |
| | | @Override |
| | | public void onLost(Network network) { |
| | | super.onLost(network); |
| | | // 网络丢失时的处理逻辑 |
| | | Log.d("MqttManager", "Network is lost."); |
| | | isHasNet = false; |
| | | try { |
| | | mqttClient.disconnect(); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | }); |
| | | // executorService.execute(() -> { |
| | | // try { |
| | | // MyLog.d("MqttManager>>>>开始重连+isHasNet:" + isHasNet + ">>>mqttClient.isConnected():" + mqttClient.isConnected()); |
| | | // while (!mqttClient.isConnected() && isHasNet) { |
| | | // MyLog.d("MqttManager>>>>开始连接"); |
| | | // mqttClient.connect(connectOptions); |
| | | // if (mqttClient.isConnected()) { |
| | | // MyLog.d("MqttManager>>>连接成功"); |
| | | // subscribeToTopic(); |
| | | // break; |
| | | // } |
| | | // Thread.sleep(5000); // 每5秒重试一次 |
| | | // } |
| | | // } catch (MqttException | InterruptedException e) { |
| | | // MyLog.e("MqttManager>>>Reconnection failed" + e.toString()); |
| | | // } |
| | | // }); |
| | | } |
| | | } |