当前位置: 首页 > 知识库问答 >
问题:

同一组下不同分区上的Kafka消费者仍然间歇地消费相同的消息

苏涛
2023-03-14

我有1个消费者群体和5个消费者。也有5个分区,因此每个消费者得到1个分区。

CLI还显示

bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic Topic-1
Topic: Topic-1  TopicId: kJqfk1FoRSWtkkjfsgw9FSg    PartitionCount: 5   ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: Topic-1  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 3    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 4    Leader: 0   Replicas: 0 Isr: 0

bin/Kafka-console-consumer . sh-bootstrap-server localhost:9092-Topic Topic-1-from-beginning-partition { n }正确显示每个分区的不同消息。

然而,我经常看到两个或两个以上的消费者在处理同一条信息,而且对于Kafka来说是新手,我真的无法解决这个问题。

我正在使用皮Kafka来消费消息:

class CB_Kafka_Consumer:
    def __init__(self):
        self._connect_kafka_consumer()
        module_logger.info(f"Kafka Consumer at {kafka_broker_url}:{kafka_broker_port}")
        ''' Get DB session object '''
        self.session = get_db_session(SQL_USERNAME, SQL_PASSWORD, SQL_SERVER_IP, SQL_DATABASE)
        module_logger.info(f"Connected to MySQL at {SQL_SERVER_IP}")

    def _connect_kafka_consumer(self):
        self._consumer = None
        try:
            self._client = KafkaClient(f"{kafka_broker_url}:{kafka_broker_port}")
            topic = self._client.topics[kafka_topic]
            self._consumer = topic.get_simple_consumer(consumer_group=CONSUMER_GROUP_NAME)

            module_logger.info("Created a Kafka Consumer")
        except Exception as ex:
            module_logger.error('Exception while connecting Kafka')
            traceback.print_exc()

    def start_consuming(self):
        module_logger.info("*"*10 + " Staring to Consume Messages " + "*"*10)
        while True:
            for msg in self._consumer:
                self._consumer.commit_offsets()
                message = json.loads(msg.value.decode('utf-8'))
                module_logger.debug(f"\n----- RECEIVED MESSAGE ----------")
                module_logger.debug(pprint.pformat(message))
                self.process_message(message) #Logic for processing messages (takes anywhere between 10min to 4 hours for the task to complete)
        self._consumer.close()

共有1个答案

庾兴发
2023-03-14

打印消息的分区和偏移量。你应该看到,事实上,它们是你正在处理的独特事件。

如果这些是相同的,则“10分钟到4小时”过程很可能导致消费者组重新平衡(默认情况下,Kafka要求您每隔几毫秒调用一次记录轮询),并且您正在经历至少一次处理语义,因此需要自己处理重复项。

我看到您在代码中使用了一些数据库客户端,因此建议您使用Kafka Connect框架,而不是编写自己的消费者

 类似资料:
  • 根据Kafka的文件: kafka保证主题分区只分配给组中的一个消费者。 但我在服务中观察到了不同的行为。以下是一些细节: 我用的是Kafka2.8和SpringKafka2.2.13。 最初我有一个Kafka主题包含5个分区,这个主题在我的服务中使用了Spring和ConcurrentKafkAlisterContainerFactory中的注释,并发性=5。这个配置对我来说很好。 后来,我开始

  • 我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。 需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。 那么,我必须在哪里修复配置 生产者级别,以便它不重新

  • 由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。

  • 我们运行一个集群工作线程应用程序,该应用程序依赖于 Kafka 使用高级消费者 API 使用消息。群集中的所有节点共享同一个使用者组。现在我们想要的是将该逻辑的一部分迁移到 Kafka 流处理器 API。这里的方法是什么?如果分配了相同的 groupId/clientId,流拓扑是否会与现有使用者就消息进行斗争?我们应该分配不同的 groupId/clientId 吗?流式传输拓扑?说“组”。 “

  • 我对Kafka有一个概念上的问题。 我们有许多机器在一个主题上充当消费者,有许多分区。这些机器运行在不同的硬件设置上,将会有比其他机器具有更高吞吐量的用户。 现在,使用者和一个或多个分区之间存在直接的相关性。

  • 我有4个分区和4个消费者(例如A、B、C、D)。如何使用使用者组配置哪个使用者将从哪个分区读取数据。我用的是Kafka的春靴。