golang源码分析:sarama kafka client(part III:client的角色) - 墨天轮
理解client的角色对我们理解kafka和sarama非常有帮助。下面将一一详细介绍:
我们用到了各种各样的client,返回的对象都是一个Broker的指针,本质上讲,我们通过kafka client 最终都是和broker通信,所以用Broker对象封装和kafka的连接,表示Client。不同场景下,Client有不同的角色,角色是通过元数据来确定的。
func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error)
metadata := client.cachedMetadata(topic, partitionID)
err := client.RefreshMetadata(topic)
func (client *client) RefreshMetadata(topics ...string) error
return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error
broker := client.any()
response, err := broker.GetMetadata(req)
err := b.sendAndReceive(request, response)
shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
client.deregisterBroker(broker)
_ = broker.Close()
可以看到,获取元数据的过程是随机选择一个broker(没有元数据不知道身份),然后获取元数据,存储下来。
1,Controller
首先是Controller,Kafka集群中,首先会选举出一个broker作为controller,然后该controller负责跟其他broker进行协调topic创建,partition主副本选举,topic删除等事务
func (client *client) Controller() (*Broker, error){
client.refreshMetadata()
controller := client.cachedController()
_ = controller.Open(client.conf)
}
func (client *client) cachedController() *Broker {
return client.brokers[client.controllerID]
}
通过元数据,获取controllerId,然后通过controllerID找到Controller,和前面介绍的一样,topic和partation相关的增删操作会用到controller,主要在admin.go中
func NewClusterAdminFromClient(client Client) (ClusterAdmin, error)
_, err := client.Controller()
func (ca *clusterAdmin) Controller() (*Broker, error)
return ca.client.Controller()
func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
b, err := ca.Controller()
func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
controller, err := ca.Controller()
func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
b, err := ca.Controller()
func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error
b, err := ca.Controller()
2,Coordinator
每个KafkaServer都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance。
func (client *client) Coordinator(consumerGroup string) (*Broker, error)
_ = coordinator.Open(client.conf)
获取元数据后,就会获取Coordinator
func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error)
broker := client.any();
request := new(FindCoordinatorRequest)
request.CoordinatorKey = consumerGroup
request.CoordinatorType = CoordinatorGroup
response, err := broker.FindCoordinator(request)
调整消费组的地方会用到,admin.go
func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error)
controller, err := ca.client.Coordinator(group)
response, err := broker.DescribeGroups(&DescribeGroupsRequest{
Groups: brokerGroups,
})
func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
return coordinator.FetchOffset(request)
func (ca *clusterAdmin) DeleteConsumerGroup(group string) error
coordinator, err := ca.client.Coordinator(group)
resp, err := coordinator.DeleteGroups(request)
创建session的时候也会用到consumer_group.go
func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
coordinator, err := c.client.Coordinator(c.groupID)
return c.newSession(ctx, topics, handler, retries)
}
离开也会用到
func (c *consumerGroup) leave() error {
c.lock.Lock()
defer c.lock.Unlock()
if c.memberID == "" {
return nil
}
coordinator, err := c.client.Coordinator(c.groupID)
心跳维持
func (s *consumerGroupSession) heartbeatLoop() {
for {
coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
}
}
3,any 任意broker
func (client *client) any() *Broker
_ = client.seedBrokers[0].Open(client.conf)
_ = broker.Open(client.conf)
4,Leader
写相关的操作都是先写到leader partation,然后同步到副本分区。
func (client *client) Leader(topic string, partitionID int32) (*Broker, error)
leader, err := client.cachedLeader(topic, partitionID)
func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, error)
_ = b.Open(client.conf)
使用的地方如下:
admin.go
func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error
async_producer.go
func (pp *partitionProducer) dispatch()
func (pp *partitionProducer) updateLeader() error
func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError)
consumer.go
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
child.broker = c.refBrokerConsumer(leader)
bc = c.newBrokerConsumer(broker)
func (child *partitionConsumer) preferredBroker() (*Broker, error)
上面就是client的四个角色和应用场景,理解他们的含义对理解kafka和sarama有重要的意义。