管灌系统巡查员智能手机App
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package com.dayu.pipirrapp.net;
 
import android.content.Context;
import android.net.ConnectivityManager;
import android.net.Network;
import android.net.NetworkCapabilities;
import android.net.NetworkRequest;
import android.util.Log;
 
import com.dayu.pipirrapp.utils.CommonKeyName;
import com.jeremyliao.liveeventbus.LiveEventBus;
 
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
/**
 * MQTT相关设置
 *
 * @author zuoxiao
 * @version 1.0
 * @since 2024-11-26
 */
public class MqttManager {
 
    private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址
    private static final String CLIENT_ID = "mqttx_54052fa0";
    private static final String TOPIC = "workOrder"; // 订阅的主题
    private ConnectivityManager connectivityManager;
 
    private MqttClient mqttClient;
    private MqttConnectOptions connectOptions;
    boolean isHasNet = false;
 
    public MqttManager(Context context) {
        try {
            mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence());
            connectOptions = new MqttConnectOptions();
            connectOptions.setUserName("mqtt_yjy");
            connectOptions.setPassword("yjy".toCharArray());
            connectOptions.setCleanSession(false);
            connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间
            connectOptions.setAutomaticReconnect(true);  // 启用自动重连
            connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
            checkNetwork();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
 
    // 连接到 MQTT broker
    public void connect() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    if (isHasNet) {
                        mqttClient.connect(connectOptions);
                        if (mqttClient.isConnected()) {
                            Log.d("MqttManager", "Connected to MQTT broker");
                            subscribeToTopic();
                        }
                    } else {
                        reconnect();
                    }
 
                } catch (MqttException e) {
                    reconnect();
                    Log.e("MqttManager", "Error connecting to MQTT broker", e);
                }
            }
        });
    }
 
    // 订阅主题
    private void subscribeToTopic() {
        try {
            mqttClient.subscribe(TOPIC, (topic, message) -> {
                // 收到消息时的处理逻辑
                Log.d("MqttManager", "Received message:" + new String(message.getPayload()));
                //传递MQ收到的信息
                LiveEventBus.get(CommonKeyName.locationData).post(message.getPayload());
            });
            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    Log.d("MqttManager", "connectionLost" + cause.getMessage());
                    reconnect();
                    // 处理连接丢失,可以尝试重新连接
                }
 
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    Log.d("MqttManager", "Received messageArrived:" + new String(message.getPayload()));
                    // 处理收到的消息
                }
 
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    // 处理消息发送完成
                    try {
                        Log.d("MqttManager", "deliveryComplete:" + new String(token.getMessage().toString()));
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
 
    // 发布消息
    public void publishMessage(String message) {
        try {
            MqttMessage mqttMessage = new MqttMessage(message.getBytes());
            mqttClient.publish(TOPIC, mqttMessage);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
 
    // 断开连接
    public void disconnect() {
        try {
            if (mqttClient != null && mqttClient.isConnected()) {
                mqttClient.disconnect();
                System.out.println("Disconnected from MQTT broker");
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
 
    // 自动重连方法
    private void reconnect() {
        try {
            if (isHasNet) {
                while (!mqttClient.isConnected() && isHasNet) {
                    Log.d("MqttManager", "Attempting to reconnect...");
                    mqttClient.connect(connectOptions);  // 重试连接
                    if (mqttClient.isConnected()) {
                        Log.d("MqttManager", "Connected to MQTT broker");
                        subscribeToTopic();
                    }
                    Thread.sleep(5000);     // 每 5 秒重试一次
                }
                Log.d("MqttManager", "Reconnected to MQTT broker! isHasNet=true");
            } else {
                Log.d("MqttManager", "isHasNet is false");
                Thread.sleep(5000);
                reconnect();
            }
 
        } catch (MqttException | InterruptedException e) {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException ex) {
                e.printStackTrace();
            }// 每 5 秒重试一次
            reconnect();
 
        }
    }
 
    public void checkNetwork() {
        NetworkRequest request = new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)  // 必须具备互联网能力
                .build();
 
        connectivityManager.registerNetworkCallback(request, new ConnectivityManager.NetworkCallback() {
            @Override
            public void onAvailable(Network network) {
                super.onAvailable(network);
                // 网络可用时的处理逻辑
                Log.d("MqttManager", "Network is available.");
                isHasNet = true;
                reconnect();
            }
 
            @Override
            public void onLost(Network network) {
                super.onLost(network);
                // 网络丢失时的处理逻辑
                Log.d("MqttManager", "Network is lost.");
                isHasNet = false;
                try {
                    mqttClient.disconnect();
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}