kafka源码分析-consumer的分区策略

  • 1、AbstractPartitionAssignor
  • 2、RangeAssignor
  • 3、RoundRobinAssignor
  • 4、StickyAssignor策略

本文源码是kafka 2.0.1

1、AbstractPartitionAssignor

consumer有三种分区策略,分别是RangeAssignor、RoundRobinAssignor和StickyAssignor,这三个策略都继承了AbstractPartitionAssignor,实现了其assign方法。该方法有两个参数:

  • partitionsPerTopic-每个topic的分区数量
  • subscriptions-每个 consumerId 与其所订阅的 topic 列表的关系,可以理解成每个consumer可能被分配到的topic
/**
* Perform the group assignment given the partition counts and member subscriptions
* @param partitionsPerTopic The number of partitions for each subscribed topic. Topics not in metadata will be excluded
*                           from this map.
* @param subscriptions Map from the memberId to their respective topic subscription
* @return Map from each member to the list of partitions assigned to them.
*/
public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions);

2、RangeAssignor

先看代码实现

public class RangeAssignor extends AbstractPartitionAssignor {@Overridepublic String name() {return "range";}private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {Map<String, List<String>> res = new HashMap<>();for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {String consumerId = subscriptionEntry.getKey();for (String topic : subscriptionEntry.getValue().topics())put(res, topic, consumerId);}return res;}// partitionsPerTopic-每个topic的分区数量// subscriptions-每个 consumerId 与其所订阅的 topic 列表的关系@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {// 将subscriptions转换成key为topic,value为consumerId的mapMap<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);// 存储rebalace方案的数据结构Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList<TopicPartition>());// 遍历consumersPerTopicfor (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {String topic = topicEntry.getKey();// 和这个topic相关的consumer列表List<String> consumersForTopic = topicEntry.getValue();// 该topic的分区数Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic == null)continue;Collections.sort(consumersForTopic);// 取商int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();// 取余数int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();// 【以下关键分配步骤】// 生成TopicPartition列表List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);for (int i = 0, n = consumersForTopic.size(); i < n; i++) {// 本consumer分配到的初始位置int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);// 长度int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);// 当consumersWithExtraPartition 不是0时,优先给前面的consumer多分配一个partition,能整除部分各consumer均分assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));}}return assignment;}}

重点关注源码的【关键分配步骤】,该步骤是将所有topic的分区平均分配给每个consumer,对于不能平均分配的分配给前consumersWithExtraPartition个consumer,也就是前consumersWithExtraPartition个consumer分配到的分区数会比后面的多一个。
【例1】
假设有一个topic有7个partition,有三个consumer都订阅了该topic,那么通过RangeAssignor的分配方案为:

  • consumer0:start=0,length=3,分配到的partition为:p0、p1、p2
  • consumer1:start=3,length=2,分配到的partition为:p3、p4
  • consumer2:start=5,length=2,分配到的partition为:p5、p6

【例2】
在看一个多topic分配的例子,假设consumer0订阅了topic0、topic1、topic2,consumer1订阅了topic0、topic1,consumer2订阅了topic2,topic0、topic1、topic2分别有3、2、1个partition,那么分配的结果是:

consumer 分配到的partition
consumer0 t0p0、t0p1、t1p0、t2p0
consumer1 t0p2、t1p1
consumer2

可以看到分配结果并不均匀,甚至有consumer闲置。

3、RoundRobinAssignor

先看源码

public class RoundRobinAssignor extends AbstractPartitionAssignor {@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {// 存储数据结构Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList<TopicPartition>());// 【关键分配步骤】// 将consumer排序并生成环状迭代器CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));// 遍历所有的partitionfor (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {final String topic = partition.topic();// 找到订阅这个partition对应topic的消费者while (!subscriptions.get(assigner.peek()).topics().contains(topic))// 返回迭代器当前位置的元素,并将迭代器计数+1assigner.next();// 将partition分配给消费者assignment.get(assigner.next()).add(partition);}return assignment;}// 获取所有的partition,并且同一个topic的partition是相连的public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {SortedSet<String> topics = new TreeSet<>();for (Subscription subscription : subscriptions.values())topics.addAll(subscription.topics());List<TopicPartition> allPartitions = new ArrayList<>();for (String topic : topics) {Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic != null)allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));}return allPartitions;}@Overridepublic String name() {return "roundrobin";}}

重点关注【关键分配步骤】它的分配规则是遍历所有的topic-partition,对每个consumer以环状的形式进行分配,从当前位置往后找到第一个可以分配的consumer,将当前partition分配给找到的consumer,并将位置+1,继续下一个partition的分配。
看2中的【例2】使用RoundRobinAssignor的分配方案

分配轮次 consumer0 consumer1 consumer2
1 t0p0 t0p1
2 t0p2 t1p0
3 t1p1 t2p0

可以看到相比于RangeAssignor分配更加均匀,但是这种方式并不能完全解决平均分配的问题,看一下下面的例子
【例3】
假设consumer0订阅了topic0、topic1、topic2,consumer1订阅了topic0、topic1,consumer2订阅了topic2,topic0、topic1、topic2分别有4、3、2个partition,那么分配的结果是:

consumer 分配到的partition
consumer0 t0p0、t0p2、t1p0、t1p2、t2p1
consumer1 t0p1、t0p3、t1p1
consumer2 t2p0

可以看到将t2p1分配给consumer2的话,分配结果更加平均。

4、StickyAssignor策略

sticky-粘性的,非常形象的名字,StickyAssignor从0.11版本才开始引入的,主要有两个目的

  • 目的1:分区的分配要尽可能均匀
  • 目的2:分区的分配要尽可能与上次分配的保持相同

目的1和目的2冲突时,目的1优于目的2。
StickyAssignor的代码较多,先看下面的流程图

以3中的【例3】为例来看一下StickyAssignor策略的分配结果

consumer 分配到的partition
consumer0 t0p0、t0p2、t1p0、t1p2
consumer1 t0p1、t0p3、t1p1
consumer2 t2p0、t2p1

相比于RoundRobinAssignor,分配结果达到均衡。假设新加入了consumer3,其订阅了topic0,那么再看一下再均衡的过程:
(1)所有partition的排序结果:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p1、t2p0、t2p1
(2)初始的consumer排序:consumer3、consumer2、consumer1、consumer0
(3)开始再均衡,显然t0p0是可以再分配的,重新分配后的结果及consuemr顺序是:

consumer(排序) 分配到的partition
consumer3 t0p0
consumer2 t2p0、t2p1
consumer0 t0p2、t1p0、t1p2
consumer1 t0p1、t0p3、t1p1

继续处理t0p1,显然也可以再分配,由consumer1调整到consumer3

consumer(排序) 分配到的partition
consumer2 t2p0、t2p1
consumer3 t0p0、t0p1
consumer1 t0p3、t1p1
consumer0 t0p2、t1p0、t1p2

(4)完成再分配过程,返回结果
流程图中省略了很多细节,相关内容可见下面源码分析:

public class StickyAssignor extends AbstractPartitionAssignor {private static final Logger log = LoggerFactory.getLogger(StickyAssignor.class);// these schemas are used for preserving consumer's previously assigned partitions// list and sending it as user data to the leader during a rebalanceprivate static final String TOPIC_PARTITIONS_KEY_NAME = "previous_assignment";private static final String TOPIC_KEY_NAME = "topic";private static final String PARTITIONS_KEY_NAME = "partitions";private static final Schema TOPIC_ASSIGNMENT = new Schema(new Field(TOPIC_KEY_NAME, Type.STRING),new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32)));private static final Schema STICKY_ASSIGNOR_USER_DATA = new Schema(new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT)));private List<TopicPartition> memberAssignment = null;private PartitionMovements partitionMovements;public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();partitionMovements = new PartitionMovements();// 先构建出当前的分配状态:currentAssignment// 这个方法里用的userData是自定义信息,但是是怎么用的??prepopulateCurrentAssignments(subscriptions, currentAssignment);// 判断是否是全新的分配,true-是boolean isFreshAssignment = currentAssignment.isEmpty();// 记录partirion可以分配给哪些consumerfinal Map<TopicPartition, List<String>> partition2AllPotentialConsumers = new HashMap<>();// 记录consumer能够被分配到哪些partitionfinal Map<String, List<TopicPartition>> consumer2AllPotentialPartitions = new HashMap<>();// 初始化partition2AllPotentialConsumers,value是空Listfor (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {for (int i = 0; i < entry.getValue(); ++i)partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<String>());}// 遍历subscriptionsfor (Entry<String, Subscription> entry: subscriptions.entrySet()) {String consumer = entry.getKey();// 初始化consumer2AllPotentialPartitions的每个consumerconsumer2AllPotentialPartitions.put(consumer, new ArrayList<TopicPartition>());for (String topic: entry.getValue().topics()) {for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {TopicPartition topicPartition = new TopicPartition(topic, i);// 将这个consumer订阅的topic的所有partition存入consumer2AllPotentialPartitionsconsumer2AllPotentialPartitions.get(consumer).add(topicPartition);// 将这个topic的所有partition都记录上consumerpartition2AllPotentialConsumers.get(topicPartition).add(consumer);}}// 当前consumer的分配方案不存在if (!currentAssignment.containsKey(consumer))// 初始化当前consumer的分配存储结构currentAssignment.put(consumer, new ArrayList<TopicPartition>());}// 以上完成partition2AllPotentialConsumers和consumer2AllPotentialPartitions的初始化// 记录当前分配方案中的partition和consumer的关系Map<TopicPartition, String> currentPartitionConsumer = new HashMap<>();for (Map.Entry<String, List<TopicPartition>> entry: currentAssignment.entrySet())for (TopicPartition topicPartition: entry.getValue())currentPartitionConsumer.put(topicPartition, entry.getKey());// 对有效分区进行排序,以便它们在潜在的重新分配阶段以适当的顺序进行处理,从而使消费者之间的分区移动最小(因此尊重最大粘性)// 详细解析见下面sortPartitions的源码List<TopicPartition> sortedPartitions = sortPartitions(currentAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions);// 将排序后的partition存到unassignedPartitionsList<TopicPartition> unassignedPartitions = new ArrayList<>(sortedPartitions);// 遍历当前的分配方案,进行删除处理for (Iterator<Map.Entry<String, List<TopicPartition>>> it = currentAssignment.entrySet().iterator(); it.hasNext();) {Map.Entry<String, List<TopicPartition>> entry = it.next();if (!subscriptions.containsKey(entry.getKey())) {// 之前的consumer不在存在,删除该consumer订阅的所有partition,并从当前分配方案中删除该consumerfor (TopicPartition topicPartition: entry.getValue())currentPartitionConsumer.remove(topicPartition);it.remove();} else {// otherwise (the consumer still exists)for (Iterator<TopicPartition> partitionIter = entry.getValue().iterator(); partitionIter.hasNext();) {TopicPartition partition = partitionIter.next();if (!partition2AllPotentialConsumers.containsKey(partition)) {// 这个partition不再存在,从当前分配方案删除partiton,并从currentPartitionConsumer删除partitionpartitionIter.remove();currentPartitionConsumer.remove(partition);} else if (!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) {// 该consumer不再订阅该partition对应的topic,从当前分配方案删除partitonpartitionIter.remove();} else// 该partition已经被分配过,从未分配的partition中删除partitionunassignedPartitions.remove(partition);}}}// at this point we have preserved all valid topic partition to consumer assignments and removed// all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions// to consumers so that the topic partition assignments are as balanced as possible.// 根据已经分配给消费者的主题分区的数量,对消费者进行升序排序TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment));sortedCurrentSubscriptions.addAll(currentAssignment.keySet());balance(currentAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions,consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer);return currentAssignment;}// 省略一些方法
}
/*** Sort valid partitions so they are processed in the potential reassignment phase in the proper order* that causes minimal partition movement among consumers (hence honoring maximal stickiness)** @param currentAssignment the calculated assignment so far* @param isFreshAssignment whether this is a new assignment, or a reassignment of an existing one* @param partition2AllPotentialConsumers a mapping of partitions to their potential consumers* @param consumer2AllPotentialPartitions a mapping of consumers to potential partitions they can consumer from* @return sorted list of valid partitions*/
private List<TopicPartition> sortPartitions(Map<String, List<TopicPartition>> currentAssignment,boolean isFreshAssignment,Map<TopicPartition, List<String>> partition2AllPotentialConsumers,Map<String, List<TopicPartition>> consumer2AllPotentialPartitions) {List<TopicPartition> sortedPartitions = new ArrayList<>();if (!isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions)) {// 如果不是新的分配,且每个consumer所可能分配到的partition都是一样的// 复制一份当前的分配方案Map<String, List<TopicPartition>> assignments = deepCopy(currentAssignment);// 将不属于当前consumer的partition从当前分配方案中删除for (Entry<String, List<TopicPartition>> entry: assignments.entrySet()) {List<TopicPartition> toRemove = new ArrayList<>();for (TopicPartition partition: entry.getValue())if (!partition2AllPotentialConsumers.keySet().contains(partition))toRemove.add(partition);for (TopicPartition partition: toRemove)entry.getValue().remove(partition);}// SubscriptionComparator是根据两个key对应value的长度进行比较,长度相同根据key进行字符串排序TreeSet<String> sortedConsumers = new TreeSet<>(new SubscriptionComparator(assignments));sortedConsumers.addAll(assignments.keySet());// 将partition进行排序,是按consumer的顺序倒序取出,也就是分配到更多partition的consumerwhile (!sortedConsumers.isEmpty()) {String consumer = sortedConsumers.pollLast();List<TopicPartition> remainingPartitions = assignments.get(consumer);if (!remainingPartitions.isEmpty()) {sortedPartitions.add(remainingPartitions.remove(0));sortedConsumers.add(consumer);}}// 将不属于consumer的partition放入到排序结果中for (TopicPartition partition: partition2AllPotentialConsumers.keySet()) {if (!sortedPartitions.contains(partition))sortedPartitions.add(partition);}} else {// PartitionComparator是根据两个key对应value的长度进行比较,如果长度相同则根据key进行字符串排序,在相同就根据topic的partition数排序TreeSet<TopicPartition> sortedAllPartitions = new TreeSet<>(new PartitionComparator(partition2AllPotentialConsumers));sortedAllPartitions.addAll(partition2AllPotentialConsumers.keySet());// 顺序取出partition进行排序,越少可以被consumer分配到的partition越在前面while (!sortedAllPartitions.isEmpty())sortedPartitions.add(sortedAllPartitions.pollFirst());}return sortedPartitions;
}
/*** Balance the current assignment using the data structures created in the assign(...) method above.*/
private void balance(Map<String, List<TopicPartition>> currentAssignment,List<TopicPartition> sortedPartitions,List<TopicPartition> unassignedPartitions,TreeSet<String> sortedCurrentSubscriptions,Map<String, List<TopicPartition>> consumer2AllPotentialPartitions,Map<TopicPartition, List<String>> partition2AllPotentialConsumers,Map<TopicPartition, String> currentPartitionConsumer) {// 是否是初始化分配,如果最大数量partition的consumer所分配到的partition都是空的,则是新的分配boolean initializing = currentAssignment.get(sortedCurrentSubscriptions.last()).isEmpty();boolean reassignmentPerformed = false;// 分配还未分配的partition// 分配给最少partition的consumer,并重新将该consumer放入sortedCurrentSubscriptions,为了重新将consumer排序for (TopicPartition partition: unassignedPartitions) {// skip if there is no potential consumer for the partitionif (partition2AllPotentialConsumers.get(partition).isEmpty())continue;assignPartition(partition, sortedCurrentSubscriptions, currentAssignment,consumer2AllPotentialPartitions, currentPartitionConsumer);}// fixedPartitions记录只能被分配给唯一consumer的partition,并将这部分partition从sortedPartitions中移除// 到目前为止所有的所有的partition已经被分配了,剔除掉不可能被重新分配的partition// fixedPartitions在后面并没有什么用途,单纯的中间变量Set<TopicPartition> fixedPartitions = new HashSet<>();for (TopicPartition partition: partition2AllPotentialConsumers.keySet())if (!canParticipateInReassignment(partition, partition2AllPotentialConsumers))fixedPartitions.add(partition);sortedPartitions.removeAll(fixedPartitions);// 将已经分配结束的consumer从sortedCurrentSubscriptions删除,并将其对应的partition存入fixedAssignmentsMap<String, List<TopicPartition>> fixedAssignments = new HashMap<>();for (String consumer: consumer2AllPotentialPartitions.keySet())// canParticipateInReassignment:consumer是否可以被重新分配,true-可以// 1、没满:满即所有可能分配给这个consumer的partition已全部分配给该consumer,返回true// 2、满了,但是该consumer的partition有任一partition可以被分配给多个consumer,返回trueif (!canParticipateInReassignment(consumer, currentAssignment,consumer2AllPotentialPartitions, partition2AllPotentialConsumers)) {sortedCurrentSubscriptions.remove(consumer);fixedAssignments.put(consumer, currentAssignment.remove(consumer));}// create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later// 备份Map<String, List<TopicPartition>> preBalanceAssignment = deepCopy(currentAssignment);Map<TopicPartition, String> preBalancePartitionConsumers = new HashMap<>(currentPartitionConsumer);// reassignmentPerformed-true,意味着被重新分配过reassignmentPerformed = performReassignments(sortedPartitions, currentAssignment, sortedCurrentSubscriptions,consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer);// getBalanceScore-计算平衡分数,分数越小越好,分数是正的// 如何计算:遍历所有consumer的partition数和其他consumer的partition数相减的绝对值累加,已经遍历过的consumer要删除(即后面的partition计算时不会在用到前面已处理过的consumer)if (!initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment)) {// 重新分配的方案不好,重置回原来的方案deepCopy(preBalanceAssignment, currentAssignment);currentPartitionConsumer.clear();currentPartitionConsumer.putAll(preBalancePartitionConsumers);}// 将前面不需要重新分配的consumer重新放到当前分配方案和排序中for (Entry<String, List<TopicPartition>> entry: fixedAssignments.entrySet()) {String consumer = entry.getKey();currentAssignment.put(consumer, entry.getValue());sortedCurrentSubscriptions.add(consumer);}fixedAssignments.clear();
}
private boolean performReassignments(List<TopicPartition> reassignablePartitions,Map<String, List<TopicPartition>> currentAssignment,TreeSet<String> sortedCurrentSubscriptions,Map<String, List<TopicPartition>> consumer2AllPotentialPartitions,Map<TopicPartition, List<String>> partition2AllPotentialConsumers,Map<TopicPartition, String> currentPartitionConsumer) {boolean reassignmentPerformed = false;boolean modified;// repeat reassignment until no partition can be moved to improve the balancedo {modified = false;// reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)// until the full list is processed or a balance is achieved// 重新分配所有可重新分配的分区(从潜在使用者最少的分区开始,如果需要),或者达到平衡Iterator<TopicPartition> partitionIterator = reassignablePartitions.iterator();// isBalanced-判断当前方案已经平衡// 1、当前分配方案分配到最少partition的consumer的数量大于等于最多的数量-1,返回ture// 2、数量比较少的consumer,没有其他consumer的partition有可能分配给该consumer,返回truewhile (partitionIterator.hasNext() && !isBalanced(currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions)) {TopicPartition partition = partitionIterator.next();// the partition must have at least two consumersif (partition2AllPotentialConsumers.get(partition).size() <= 1)log.error("Expected more than one potential consumer for partition '" + partition + "'");// the partition must have a current consumerString consumer = currentPartitionConsumer.get(partition);if (consumer == null)log.error("Expected partition '" + partition + "' to be assigned to a consumer");// check if a better-suited consumer exist for the partition; if so, reassign itfor (String otherConsumer: partition2AllPotentialConsumers.get(partition)) {if (currentAssignment.get(consumer).size() > currentAssignment.get(otherConsumer).size() + 1) {// 参照下面源码解析reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions);reassignmentPerformed = true;modified = true;break;}}}} while (modified);return reassignmentPerformed;
}
private void reassignPartition(TopicPartition partition,Map<String, List<TopicPartition>> currentAssignment,TreeSet<String> sortedCurrentSubscriptions,Map<TopicPartition, String> currentPartitionConsumer,Map<String, List<TopicPartition>> consumer2AllPotentialPartitions) {String consumer = currentPartitionConsumer.get(partition);// sortedCurrentSubscriptions是按consumer已分配的partition数量升序,所以找到第一可以分配到该partition的consumer就是新的可分配consumerString newConsumer = null;for (String anotherConsumer: sortedCurrentSubscriptions) {if (consumer2AllPotentialPartitions.get(anotherConsumer).contains(partition)) {newConsumer = anotherConsumer;break;}}assert newConsumer != null;// 找到准确的需要被重新分配的partition,为了粘性分配,详情参考下面getTheActualPartitionToBeMoved源码TopicPartition partitionToBeMoved = partitionMovements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer);// 将需要移动的partition进行重新分配// 1、将新旧consumer从排序结果中删除,为了重新存入排序// 2、更新partitionMovements和partitionMovementsByTopic// 3、更新新旧consumer的分配方案和currentPartitionConsumer// 4、对新旧consumer的分配结果重新排序processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer);return;
}
// partitionMovementsByTopic-存储partition上一次的调整关系(由srcConsumer调整到destConsumer)
// partitionMovementsForThisTopic-存储topic维度下ConsumerPair(新旧consumer)调整的partition
private TopicPartition getTheActualPartitionToBeMoved(TopicPartition partition, String oldConsumer, String newConsumer) {String topic = partition.topic();// 该topic的所有partition从未被调整过,直接返回该partitionif (!partitionMovementsByTopic.containsKey(topic))return partition;// 该partition被调整过if (partitionMovements.containsKey(partition)) {// this partition has previously moved// 这次调整的旧consumer一定是上次调整的新consumerassert oldConsumer.equals(partitionMovements.get(partition).dstMemberId);// 旧consumer赋值为上次调整的旧consumer// 这是为了下面更大限度保证粘性,比如partiiton0上次调整是A->C,这次是C->B,但是存在一个partiiton1是从B调整到A的,// 经过这个赋值,将会用partition1代替partition0进行调整,这样B趋于平衡,A趋于不平衡,这样下次调整就有可能将A调整到C的partiiton调整会A// 这个设计比较绕oldConsumer = partitionMovements.get(partition).srcMemberId;}Map<ConsumerPair, Set<TopicPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic);// 新旧consumer的“反consumer对”ConsumerPair reversePair = new ConsumerPair(newConsumer, oldConsumer);// 不存在“反consumer对”,直接返回该partitionif (!partitionMovementsForThisTopic.containsKey(reversePair))return partition;// 返回该“反consumer对”之前调整的一个partition,为了满足StickyAssignor的目的2,尽可能保证分配和上次相同// 举个例子,比如这次要将partition0从consumerA调整到consumerB,但是在这之前曾将partition1从consumerB调整到consumerA,那么需要用partiiton1代替partiiton0// 这时候partiion0和partition1都是属于consumerA的,调整partiion0和partition1都可以使分配方案趋于平衡,但是调整partition1更符合粘性策略return partitionMovementsForThisTopic.get(reversePair).iterator().next();}

本文主要对kafka的三种分区策略进行源码分析,RangeAssignor、RoundRobinAssignor都不能保证完全的均衡分配,StickyAssignor虽然实现复杂,但是相比于其他两种分配策略,均衡效果更好,而且可以减少不必要的分区调整。

kafka源码分析-consumer的分区策略相关推荐

  1. Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)

    文章很长,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈 奉上以下珍贵的学习资源: 免费赠送 经典图书:<Java高并发核心编程(卷1)> 面试必备 + 大厂必备 +涨薪 ...

  2. Kafka 源码分析之网络层(二)

    上一篇介绍了概述和网络层模型实现<Kafka 源码分析之网络层(一)>,本编主要介绍在Processor中使用的nio selector的又一封装,负责具体数据的接收和发送. PS:丰富的 ...

  3. apache kafka源码分析-Producer分析---转载

    原文地址:http://www.aboutyun.com/thread-9938-1-1.html 问题导读 1.Kafka提供了Producer类作为java producer的api,此类有几种发 ...

  4. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  5. s-sgdisk源码分析 “--set-alignment=value分区对齐参数”

    文章目录 边界对齐子命令使用 源码分析 sgdisk.cc main函数入口 gptcl.cc DoOptions解析并执行具体命令函数 gpt.cc CreatePartition创建分区函数,设置 ...

  6. Kafka 源码分析之网络层(一)

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据.小编会给大家带来几期 Kafka 相关的源码分析文章.这一系列文章是基于kafka 0.9.1版本,今天 ...

  7. Android进阶——ExoPlayer源码分析之宽带预测策略的算法详解

    前言 由于国内基础设施非常优秀,在平时的开发中,很少会关注网络情况,很容易忽略弱网情况下的网络状况,如果项目属于国外App,则需要考虑到当前的基础设施和网络情况,特别是播放视频的时候,需要通过动态调整 ...

  8. kafka源码分析(二)Metadata的数据结构与读取、更新策略

    一.基本思路 异步发送的基本思路就是:send的时候,KafkaProducer把消息放到本地的消息队列RecordAccumulator,然后一个后台线程Sender不断循环,把消息发给Kafka集 ...

  9. kafka源码分析之副本管理-ReplicaManager

    原文地址:https://blog.csdn.net/u014393917/article/details/52043040 ReplicaManager 说明,此组件用于管理kafka中各parti ...

最新文章

  1. python练习题(python之“求一个数的阶乘并求结果中从后向前数第一个不为0(零)的数” 等)
  2. 快速修剪技巧_三角梅花后修剪有讲究,轻剪还是重剪?品种和养殖环境来决定...
  3. 字符串 拼接方法,公司内部的方法,用集合转换成拼接的字符串
  4. 加密解密技术—对称算法加密
  5. ElasticSearch-7.10版本最新万字长文教程【距离搞懂ELK核心你只差这一片文章】
  6. Unix 文件系统读写时权限校验
  7. Windows 8 各版本功能区别一览表
  8. python2.7如何安装库_python 2.7 安装目录python如何连接数据库
  9. MySQL 外连接查询
  10. Bootstrap表单控件的尺寸
  11. 项目实施计划及总体设计报告(大纲)
  12. meta分析 2. 固定效应和随机效应
  13. base64编码图片替换url图片
  14. 华为机试:机器人走迷宫
  15. 阿里云建站之模板建站的核心优势有哪些?
  16. 企业做网络推广和软文发布都有哪些营销渠道?
  17. Error: Registry key ‘Software\JavaSoft\Java Runtime Environment’\CurrentVersion’
  18. EFR32MG22与TI CC2652RSIP对比
  19. html如何实现空格?
  20. 未明学院:从国企联通到金融科技随手记,学长告诉你国企和互联网私企差别有多大?

热门文章

  1. 生活记录--2019.11
  2. video标签播放MP4只有声音没有视频
  3. 【Vue】vue中使用pdf,看这篇就够了~
  4. 笔记本屏幕计算机,笔记本电脑显示屏的性能参数有哪些
  5. 推荐几个比较好的游戏引擎
  6. 小白的装机之旅之采购篇
  7. 艾司博讯:拼多多上架商品为什么被驳回
  8. 仿淘宝商品详情页中(继续拖动到图文详情)
  9. Unity【DateTime】- 如何为软件添加使用有效期
  10. revert (without reverting children)