RocketMQ消息发送之pull和push
生活随笔
收集整理的這篇文章主要介紹了
RocketMQ消息发送之pull和push
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
RocketMQ學習(五)——RocketMQ消息發送之pull和push
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {String group_name = "pull_producer";DefaultMQProducer producer = new DefaultMQProducer(group_name);producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 5; i++) {try {Message msg = new Message("TopicPull",// topic"TagA",// tag("Hello RocketMQ " + i).getBytes()// body);SendResult sendResult = producer.send(msg,1000);System.out.println(sendResult);Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();Thread.sleep(2000);}}producer.shutdown();} }?
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {String group_name = "pull_producer";DefaultMQProducer producer = new DefaultMQProducer(group_name);producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 5; i++) {try {Message msg = new Message("TopicPull",// topic"TagA",// tag("Hello RocketMQ " + i).getBytes()// body);SendResult sendResult = producer.send(msg,1000);System.out.println(sendResult);Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();Thread.sleep(2000);}}producer.shutdown();} } import java.util.List;import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.MQPullConsumer; import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullTaskCallback; import org.apache.rocketmq.client.consumer.PullTaskContext; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;public class PullScheduleService {public static void main(String[] args) throws InterruptedException, MQClientException {String group_name = "schedule_consumer";String TOPIC_TEST = "TopicPull";final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name); DefaultMQPullConsumer consumer = scheduleService.getDefaultMQPullConsumer();consumer.setNamesrvAddr("localhost:9876");scheduleService.setMessageModel(MessageModel.CLUSTERING);scheduleService.registerPullTaskCallback(TOPIC_TEST, new PullTaskCallback() {public void doPullTask(MessageQueue mq, PullTaskContext context) {MQPullConsumer consumer = context.getPullConsumer();try {//獲取從哪里開始拉取long offset = consumer.fetchConsumeOffset(mq, false);if(offset < 0) {offset = 0;}PullResult pullResult = consumer.pull(mq, "*", offset, 32);switch (pullResult.getPullStatus()) {case FOUND:List<MessageExt> list = pullResult.getMsgFoundList();for (MessageExt msg : list) {System.out.println(new String(msg.getBody()));}break;case NO_MATCHED_MSG:break;case NO_NEW_MSG:case OFFSET_ILLEGAL:break;default:break;}//存儲offset,客戶端每隔5s會定時刷新到brokerconsumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());//重新拉取 建議超過5s這樣就不會重復獲取context.setPullNextDelayTimeMillis(6000);} catch (Exception e) {e.printStackTrace();}}});scheduleService.start();}}?
總結
以上是生活随笔為你收集整理的RocketMQ消息发送之pull和push的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMQ API使用简介、拉取机
- 下一篇: RocketMQ Filtersrv