package com.dy.pipIrrApp.workOrder; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Component; /** * @author ZhuBaoMin * @date 2024-11-04 15:02 * @LastEditTime 2024-11-04 15:02 * @Description */ @Component public class ConsumerListener_push{ public void receiveMessage () throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group"); consumer.setNamesrvAddr("127.0.0.1:9876"); // 设置消息监听器 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt msg : msgs) { System.out.println("ConsumerListener_push receive message: " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 订阅主题和标签 consumer.subscribe("workOrder", "王五"); // 启动消费者 consumer.start(); } }