当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者团体标识共享问题

林泰平
2023-03-14

我有2个Kafka消费者共享相同的消费者组ID,但订阅不同的主题。它们中的每一个都只能从相应的主题中阅读。

当第一个使用者运行时,会从其订阅的主题中为其分配分区。当第二个使用者也运行时,使用者组会重新平衡(导致分配给第一个使用者的分区被撤销)。到目前为止,一切顺利。这与Kafka消费群体Id和消费者再平衡问题中的讨论一致。

但是,我开始在消费者1中看到TOPIC_AUTHORIZATION_FAILED——显然它试图访问它不订阅也无法访问的另一个主题。从这一点开始,消费者不会向前走,而是继续跑路。

我期望在重新平衡后,consumer 1将从它订阅的主题中重新分配分区,并继续前进。消费者1为什么尝试访问其他主题/我如何解决此问题?

日志

公共请求测试:消费者1的主题

公共请求开发:消费者2的主题

下面是Consumer 1日志(注意最后几行,它试图访问common request dev)-

{"@timestamp":"2021-10-22T07:39:17.550Z","message":"onPartitionsAssigned: [common-request-test-2, common-request-test-3, common-request-test-0, common-request-test-1]","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"c.m.l.e.s.a.config.KafkaReceiverConfig","ex":""}

{"@timestamp":"2021-10-22T07:39:17.853Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Found no committed offset for partition common-request-test-2","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.ConsumerCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:39:17.857Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Setting offset for partition common-request-test-3 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka1-devtest1:9093 (id: 0 rack: null)], epoch=0}}","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.ConsumerCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:39:17.858Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Setting offset for partition common-request-test-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka1-devtest1:9093 (id: 0 rack: null)], epoch=0}}","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.ConsumerCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:39:17.858Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Setting offset for partition common-request-test-1 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka2-devtest1:9093 (id: 1 rack: null)], epoch=0}}","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.ConsumerCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:39:19.382Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Resetting offset for partition common-request-test-2 to offset 0.","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.SubscriptionState","ex":""}

{"@timestamp":"2021-10-22T07:43:21.598Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Attempt to heartbeat failed since group is rebalancing","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.AbstractCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:43:21.599Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Revoke previously assigned partitions common-request-test-2, common-request-test-3, common-request-test-0, common-request-test-1","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.ConsumerCoordinator","ex":""}

{"@timestamp":"2021-10-22T07:43:21.599Z","message":"onPartitionsRevoked: [common-request-test-2, common-request-test-3, common-request-test-0, common-request-test-1]","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"c.m.l.e.s.a.config.KafkaReceiverConfig","ex":""}

{"@timestamp":"2021-10-22T07:43:21.599Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] (Re-)joining group","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.AbstractCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:43:22.174Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Error while fetching metadata with correlation id 920 : {common-request-dev=TOPIC_AUTHORIZATION_FAILED}","severity":"WARN","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"org.apache.kafka.clients.NetworkClient","ex":""}

{"@timestamp":"2021-10-22T07:43:22.174Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Topic authorization failed for topics [common-request-dev]","severity":"ERROR","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"org.apache.kafka.clients.Metadata","ex":""}

{"@timestamp":"2021-10-22T07:43:22.175Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Join group failed with org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [common-request-dev]","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.AbstractCoordinator","ex":""}

共有1个答案

徐奇逸
2023-03-14

听起来像您尝试使用StickyAssignor为您的消费者配置,或使用分配,而不是订阅(或与维护ACL策略的人交谈,并告诉他们您需要什么访问权限)

否则,是的,整个小组的主题集都会重新平衡。您正在为组订阅一个主题,而不是单个消费者。

 类似资料:
  • 我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群

  • 我是Kafka的新手。我看了一眼Kafka文档。似乎分派给订阅消费者组的消息是通过将分区与消费者实例绑定来实现的。 在使用Apache Kafka时,我们应该记住一件重要的事情,即同一消费者组中的消费者数量应该小于或等于所使用主题中的分区数量。否则,将不会收到来自主题的任何消息。 在非prod环境中,我没有配置主题分区。在这种情况下,Kafka是否只有一个分区。如果我启动共享同一组的多个消费者并向

  • 我正在阅读Kafka常见问题解答,他们如下所示。 •每个分区不会被每个使用者组中的多个使用者线程/进程使用。这允许每个进程以单线程方式使用,以保证分区内的使用者的顺序(如果我们将有序消息分割成一个分区并将它们传递给多个使用者,即使这些消息是按顺序存储的,它们有时也会被无序地处理)。 有没有可能,

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 这与以下问题几乎相同:发送给具有相同消费者组名称的所有消费者的消息。公认的答案是使用Kafka 0.8.1或更高版本,我就是这么做的。 Kafka留档说: 如果所有使用者实例都具有相同的使用者组,则其工作原理就像在使用者之间平衡负载的传统队列一样。 但是我无法使用 Kafka 0.8.2.1 和 kafkacat 观察到这种行为。 我的设置: Kafka Zookeeper 运行在 spotify

  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前