说到metaq的消费者balance策略,不得不说一下分区的有关信息。一个topic可以划分为n个分区。每个分区是一个有序的、不可变的、顺序递增的队列。
分区一方面是为了增大消息的容量(可以分布在多个分区上存,而不会限制在单台机器存储大小里),二方面可以类似看成一种并行度。
消费者的负载均衡与topic的分区数据紧密相关,需要考虑几种情况:
1、单个分组内的消费者数目如果比总得分区数目多的话,则多出来的消费者不参与消费。每个分区针对每个消费者group只挂一个消费者,同一个group的多余消费者不参与消费。
2、如果分组内的消费者数目比分区数目小,则有部分消费者要额外承担消息的消费任务。当分区数目n大于单个group的消费者数目m时,则有n%m个消费者需要额外承担1/n的消费任务。n足够大的时候可以认为负载平均分配。
综上所述,单个分组内的消费者集群的负载均衡策略如下:
①每个分区针对一个group只挂载一个消费者
②如果同一个group的消费者数目大于分区数目,则多出来的消费者不参与消费
③如果同一个group的消费者数目小于分区数目,则有部分消费者需要额外承担消费任务。
meta客户端处理消费者的负载均衡方式:将消费者列表和分区列表分别排序,然后按照上述规则做合理的挂载。如果某个消费者故障,其他消费者会感知到这一变化,然后重新进行负载均衡,保证所有分区都有消费者进行消费。
Consumer的balance策略实现在metaq中提供了两种:ConsisHashStrategy和DefaultLoadBalanceStrategy。
首先,来看DefaultLoadBalanceStrategy的实现:
public List<String> getPartitions(final String topic, final String consumerId, final List<String> curConsumers, final List<String> curPartitions) { // 每个订阅者平均挂载的partition数目 final int nPartsPerConsumer = curPartitions.size() / curConsumers.size(); // 挂载到额外partition的consumer数目 final int nConsumersWithExtraPart = curPartitions.size() % curConsumers.size(); log.info("Consumer " + consumerId + " rebalancing the following partitions: " + curPartitions + " for topic " + topic + " with consumers: " + curConsumers); final int myConsumerPosition = curConsumers.indexOf(consumerId); if (myConsumerPosition < 0) { log.warn("No broker partions consumed by consumer " + consumerId + " for topic " + topic); return Collections.emptyList(); } assert myConsumerPosition >= 0; // 计算当前consumer挂载的分区起点 final int startPart = nPartsPerConsumer * myConsumerPosition + Math.min(myConsumerPosition, nConsumersWithExtraPart); //计算当前consumer共挂载的分区数=每个consumer的挂载数+额外承担的分区数 final int nParts = nPartsPerConsumer + (myConsumerPosition + 1 > nConsumersWithExtraPart ? 0 : 1); if (nParts <= 0) { log.warn("No broker partions consumed by consumer " + consumerId + " for topic " + topic); return Collections.emptyList(); } final List<String> rt = new ArrayList<String>(); for (int i = startPart; i < startPart + nParts; i++) { final String partition = curPartitions.get(i); rt.add(partition); } return rt; }
相关推荐
metamorphosis(metaq) 服务端1.4.3版本 包括客户端 发送一个序列化对象
整理后的Metaq原理应用文档,欢迎大家看看。
metaQ向spark传数据
Metaq在JDk 7下的异常及解决方案,希望可以帮助学习者!
metaq-server-1.4.6.2服务端+客户端+javadoc文档,打包于一个压缩包
metaq-server-1.4.6.2.tar.gz
metaQ的安装包
Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。
metaq--1.4.6.2.zip 和原版一样就是换了个名字,方便大家一起学习.
MetaQ 分布式消息服务中间件.pdf
Memorphosis是一个消息中间件,它是linkedin开源MQ——kafka的Java版本,针对淘宝内部应用做了定制和优化。Metamorphosis的设计原则 • 消息都是持久的,保存在磁盘 • 吞吐量第一 • 消费状态保存在客户端 ...
Metamorphosis(MetaQ)是一个高性能、高可用、可扩展的分布式消息中间件,类似于LinkedIn的Kafka,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,在...
rocketMQ
数据消费者,consumer 的用法:《consumer 的用法》 迓有些零碎的,关亍通信段的源码览读:《net 包源码览读》、《broker 配置》 扩展的阅读迓有下面返些: 关亍 kafka 和 jafka 的相关博客,特删好,有徆多问题也都...
分享一下 RocketMq的文档RocketMQ运维指令 rocketmq在阿里内部叫metaq
该文档为storm模拟项目系列文档之一,是MetaQ与storm接口的说明文档,主要介绍了如何集成MetaQ到项目代码中。软件(阿里),其对应的许多技术文档还是比较容易看的,并且Github提供了许多的应用实例,所以使用MetaQ...
Metamorphosis, 一种高可用高性能的分布式 #新闻MetaQ 1.4.6.2 发布。更新日志MetaQ 1.4.6.1 发布。更新日志MetaQ 1.4.5.1 发布。更新日志MetaQ 1.4.5发布。更新日志meta: 一个用于的ruby 客户端。 源代码
RocketMQ是一个纯java、分布式、队列模型的开源消息中间件,前身是Metaq,当 Metaq 3.0发布时,产品名称改为 RocketMQ。