Client-side partition assignment in Kafka

Donghyung Ko
3 min readApr 14, 2024

While examining the Kafka client library code, I noticed the ConsumerPartitionAssignor interface and several implementations (RangePartitionAssignor, CooperativeStickyAssignor) responsible for determining which partitions a consumer should subscribe to at the time of consumer group rebalancing. The task of distributing partitions based on connected consumer information may seem like something that should be handled on the server-side (e.g., broker), so it felt somewhat unusual to find such content in the client library code. Upon further investigation, I found a document explaining the background of why Kafka adopted this structure.

The Need for Client-side Partition Assignment Policies Partition

assignment tasks don’t necessarily have to be handled by the server. For instance, if partition assignment responsibility lies with the broker, there’s a burden of restarting the broker to apply a new partition assignment policy. Partition assignment policies can be highly diverse, making it difficult to apply standardized validation rules. For instance, a partition might need to be assigned to multiple consumers simultaneously. Representative cases requiring custom partition assignment policies include:

  • Co-partitioning: When performing topic joins, specific partitions from different topics might need to be assigned to the same consumer.
  • Sticky partitioning: Stateful consumers may need to minimize partition reassignment during partition rebalancing.
  • Redundant partitioning: The same partition might need to be assigned to multiple consumers simultaneously, for tasks like creating a search index.
  • Metadata-based assignment: Assigning partitions based on metadata like hardware specifications, such as rack-awareness.

Client-side Partition Assignment Protocol

To address these requirements, the protocol is enhanced to perform partition assignment on the client-side. Consumer group rebalancing occurs through the following steps.

JoinGroupRequest

This is the process where consumers self-register to a consumer group. The broker receives JoinGroupRequest from consumers for a certain period and then, after a waiting period, randomly selects one consumer as the leader. The elected leader gains authority to distribute partitions to consumers. The JoinGroupResponse allows participating consumers to confirm whether they have been elected as leaders. During this process, the consumer group leader determines which partition assignment policy to use. This policy, decided by the broker, is one that all consumers can use to prevent inconsistencies due to differences in partition assignment policies held by client implementations.

SyncGroupRequest

This step involves the leader distributing partitions to members and synchronizing them. Every consumer sends a SyncGroupRequest to the broker. The broker receives partition distribution results from the leader consumer and shares them with other members. The MemberState field in SyncGroupResponse contains partition information assigned to each member.

Coordination State Machine

The coordinator broker manages a state machine to control consumer group rebalancing.

  • Down: No members in the consumer group yet.
  • Initialize: Initializing the coordinator from Zookeeper state.
  • Stable: Holding an active generation or awaiting a new `JoinGroupRequest` if there is no active generation.
  • Joining: Receiving JoinGroupRequest from new members to create a new generation.
  • AwaitingSync: Temporarily waiting after Joining until receiving SyncGroupRequest from the leader consumer.

Implementation

Client-side partition assignment policies can be found in the Kafka client library. In Java, based on the apache kafka-client, you can find implementations related to this protocol in classes like ConsumerPartitionAssignor, RangePartitionAssignor, CooperativeStickyAssignor, and ConsumerCoordinator. As of version 3.4.1 of apache kafka-client, RangePartitionAssignor is the default, which is a partition assignment policy causing so-called stop-the-world scenarios. CooperativeStickyAssignor is an improved partition assignment policy that mitigates stop-the-world scenarios caused by consumer group rebalancing.

--

--