新网创想网站建设,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章将为大家详细讲解有关RocketMQ客户端PUSH消费的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
我们注重客户提出的每个要求,我们充分考虑每一个细节,我们积极的做好成都网站制作、网站建设服务,我们努力开拓更好的视野,通过不懈的努力,创新互联建站赢得了业内的良好声誉,这一切,也不断的激励着我们更好的服务客户。 主要业务:网站建设,网站制作,网站设计,成都微信小程序,网站开发,技术开发实力,DIV+CSS,PHP及ASP,ASP.Net,SQL数据库的技术开发工程师。
PUSH消费整体流程是怎么样的?
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("Jodie_topic_1023", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20170422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
概览流程1
概览流程2
小结:PUSH消费的主要内容实例化DefaultMQPushConsumer,注册MessageQueue分配策略;初始化订阅数据并存入缓存;注册消费监听用于回调处理消息;创建并启动MQClientInstance实例;向Broker发送心跳等工作。
小结:参数校验中MessageListener只能为顺序消费或者并发消费两种模式;消费最小线程consumeThreadMin取值需要小于1000即最多1000个消费线程;由于为无界队consumeThreadMax设置无效。
MQClientInstance初始化启动连带一系列线程类的启动。例如:PullMessageService、RebalanceService等以及通过Netty建立TCP通道。
关于“RocketMQ客户端PUSH消费的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。