管灌系统巡查员智能手机App
zuoxiao
2024-10-16 bc0643f42cc19cfa1153f355851968e5486281ef
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.dayu.pipirrapp.tool;
 
import android.util.Log;
 
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
 
import java.util.List;
 
/**
 * author: zuo
 * Date: 2024-09-03
 * Time: 13:51
 * 备注:MQ消息队列模块
 */
public class RocketMQConsumer {
 
    private DefaultMQPushConsumer consumer;
 
    public void start() {
        try {
            // 创建一个消费者实例
            consumer = new DefaultMQPushConsumer("YourConsumerGroup");
 
            // 设置NameServer地址
            consumer.setNamesrvAddr("YourNameServerAddress");
 
            // 订阅一个或多个Topic
            consumer.subscribe("YourTopic", "*");
 
            // 注册消息监听器
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        // 处理消息
                        Log.d("RocketMQ", "Receive message: " + new String(msg.getBody()));
                        // 这里可以触发本地通知或者更新UI
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
 
            // 启动消费者
            consumer.start();
 
        } catch (MQClientException e) {
            e.printStackTrace();
            Log.e("RocketMQ", "Consumer failed to start: " + e.getMessage());
        }
    }
 
    public void stop() {
        if (consumer != null) {
            consumer.shutdown();
        }
    }
}