package com.dayu.pipirrapp.tool; import android.util.Log; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * author: zuo * Date: 2024-09-03 * Time: 13:51 * 备注:MQ消息队列模块 */ public class RocketMQConsumer { private DefaultMQPushConsumer consumer; public void start() { try { // 创建一个消费者实例 consumer = new DefaultMQPushConsumer("YourConsumerGroup"); // 设置NameServer地址 consumer.setNamesrvAddr("YourNameServerAddress"); // 订阅一个或多个Topic consumer.subscribe("YourTopic", "*"); // 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { // 处理消息 Log.d("RocketMQ", "Receive message: " + new String(msg.getBody())); // 这里可以触发本地通知或者更新UI } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); } catch (MQClientException e) { e.printStackTrace(); Log.e("RocketMQ", "Consumer failed to start: " + e.getMessage()); } } public void stop() { if (consumer != null) { consumer.shutdown(); } } }