管灌系统巡查员智能手机App
zuoxiao
2024-12-18 32275aa66faa5371467e291b7d19a5e782f8aade
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package com.dayu.pipirrapp.net;
 
import android.content.Context;
import android.util.Log;
 
import androidx.lifecycle.LifecycleOwner;
import androidx.lifecycle.Observer;
 
import com.dayu.pipirrapp.MyApplication;
import com.dayu.pipirrapp.utils.CommonKeyName;
import com.dayu.pipirrapp.utils.MyJsonParser;
import com.dayu.pipirrapp.utils.MyLog;
import com.dayu.pipirrapp.utils.NetUtils;
import com.jeremyliao.liveeventbus.LiveEventBus;
 
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
/**
 * MQTT相关设置
 *
 * @author zuoxiao
 * @version 1.0
 * @since 2024-11-26
 */
public class MqttManager {
 
    //    private static final String MQTT_BROKER_URL = "tcp://115.236.153.170:30764"; // 修改为你的 broker 地址
    private static final String MQTT_BROKER_URL = "tcp://192.168.10.52:1883";
    private static final String CLIENT_ID = "mqttx_a7a9fe73";
    private static final String TOPIC = "workOrder"; // 订阅的主题
 
    private MqttClient mqttClient;
    private MqttConnectOptions connectOptions;
    boolean isHasNet = true;
    //是否连接成功过一次,没有的话联网后重连
    boolean isConnet = false;
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
 
    public MqttManager(Context context, LifecycleOwner lifecycleOwner) {
        try {
            mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence());
            connectOptions = new MqttConnectOptions();
            connectOptions.setUserName("mqtt_yjy");
            connectOptions.setPassword("yjy".toCharArray());
            connectOptions.setCleanSession(false);
            connectOptions.setKeepAliveInterval(60); // 设置保持连接的时间
            connectOptions.setAutomaticReconnect(true);  // 启用自动重连
            connectOptions.setConnectionTimeout(30);// 设置连接超时时间,单位为秒
            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    Log.d("MqttManager", "连接丢失:" + cause.getMessage());
                    // 处理连接丢失,可以尝试重新连接
                    reconnect();
                }
 
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    Log.d("MqttManager", "messageArrived收到的消:" + new String(message.getPayload()));
                    // 处理收到的消息
                }
 
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    // 处理消息发送完成
                    try {
                        Log.d("MqttManager", "发送完成:" + new String(token.getMessage().toString()));
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            });
            LiveEventBus.get(CommonKeyName.NetworkCallback).observe(lifecycleOwner, new Observer<Object>() {
                @Override
                public void onChanged(Object o) {
                    if (o instanceof Integer) {
                        switch ((int) o) {
                            case NetUtils.Available:
                                MyLog.i("MqttManager>>>Available");
                                isHasNet = true;
                                if (!isConnet) {
                                    reconnect();
                                }
                                break;
                            case NetUtils.Lost:
                                MyLog.i("MqttManager>>>Lost");
                                isHasNet = false;
//                                disconnect();
                                break;
                        }
                    }
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
 
    // 连接到 MQTT broker
    public void connect() {
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Log.d("MqttManager", "connect开始连接isHasNet:" + isHasNet);
                    if (isHasNet) {
                        Log.d("MqttManager", "connect开始连接");
                        mqttClient.connect(connectOptions);
                        if (mqttClient.isConnected()) {
                            Log.d("MqttManager", "connect连接成功");
                            subscribeToTopic();
                        } else {
                            Log.d("MqttManager", "connect连接失败");
                        }
                    } else {
                        reconnect();
                    }
 
                } catch (MqttException e) {
                    reconnect();
                    Log.e("MqttManager", "Error connecting to MQTT broker", e);
                }
            }
        });
    }
 
    // 订阅主题
    private void subscribeToTopic() {
        try {
            isConnet = true;
            mqttClient.subscribe(TOPIC, (topic, message) -> {
                // 在子线程收到消息时的处理逻辑
                Log.d("MqttManager", "subscribe收到消息:" + new String(message.getPayload()));
                //传递MQ收到的信息
                HashMap<String, Object> data= MyJsonParser.getMapFromJson(new String(message.getPayload()));
                //判断是否是当前用户
                if (data.get("inspectorId").equals(MyApplication.myApplication.userId)){
                    LiveEventBus.get(CommonKeyName.MQTTData).post(message.getPayload());
                }
            });
 
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
 
    // 发布消息
    public void publishMessage(String message) {
        try {
            MqttMessage mqttMessage = new MqttMessage(message.getBytes());
            mqttClient.publish(TOPIC, mqttMessage);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
 
    // 断开连接
    public void disconnect() {
        MyLog.d("MqttManager>>>>关闭连接");
        try {
            if (mqttClient != null && mqttClient.isConnected()) {
                mqttClient.disconnect();
                MyLog.d("MqttManager>>>>关闭连接成功");
            }
        } catch (MqttException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
 
    // 自动重连方法
    private void reconnect() {
        executorService.execute(() -> {
            int reconnectAttempts = 0;
            while (!mqttClient.isConnected() && isHasNet) {
                try {
                    MyLog.d("MqttManager>>>>开始重连");
                    mqttClient.connect(connectOptions);
                    if (mqttClient.isConnected()) {
                        MyLog.d("MqttManager>>>连接成功");
                        subscribeToTopic();
                        break;
                    }
                    // 指数退避
                    long backoff = Math.min((1L << reconnectAttempts) * 1000, 30000);
                    Thread.sleep(backoff);
                    reconnectAttempts++;
                } catch (MqttException e) {
                    MyLog.e("MqttManager>>>Reconnection failed" + e.getMessage());
                    // 可以在这里增加重连尝试次数的限制
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // 保留中断状态
                    return;
                }
            }
        });
    }
}