当前位置: 首页 > 知识库问答 >
问题:

在Kafka消费者中,什么会导致“未能按时间获得补偿”?

刘弘新
2023-03-14

我有一个Kafka消费者。好像工作了一段时间,然后就死了。它重复这样做。我得到了这个异常,但没有其他信息。

org.apache.kafka.common.errors.TimeoutException:
Failed to get offsets by times in 305000 ms

305000毫秒是5分钟。有什么线索可能导致这种情况吗?或者尝试找出答案的步骤?

如果相关:

我在不同的机器上有3个进程,使用最新的JavaKafka客户端版本0.10.2.0。每台机器运行20个线程,每个线程都有一个单独的消费者。根据设计,当一个线程死亡时,所有线程都被杀死,进程死亡,然后重新启动。这导致大约20个消费者同时死亡和重新启动,这将导致重新平衡。所以这可能会导致客户端之间的周期性干扰。然而,这并不能解释为什么我首先会出现这个异常。

我有三台Kafka机器和三台Zookeeper机器。每个客户机的<code>引导程序中都有3台Kafka机器。服务器配置。该主题有200个分区,这意味着每个线程分配了大约3个分区。该主题的复制因子为2。

Kafka或动物园管理员日志中没有错误。

设置了以下配置值,没有其他配置值。

  • 引导程序。服务器
  • 小组。id
  • 键。反序列化程序
  • 值分解器

共有1个答案

宋经赋
2023-03-14

我今天碰到了这个。根据我使用的是Kafka 1.0客户端库还是Kafka 2.0客户端库,我看到了该错误消息的两个不同版本。对于Kafka 1.0客户端,错误消息为< code > " org . Apache . Kafka . common . errors . time out exception:未能在305000毫秒内按时间获取偏移量" ;对于2.0客户端库,错误消息为< code > " org . Apache . Kafka . common . errors . time out exception:未能在30003毫秒内按时间获取偏移量" 。

我在尝试使用kafka-console-consumer命令(例如< code > Kafka-consumer-groups-bootstrap-server { servers }-group { group }-describe )监控偏移/延迟时收到了此消息。这些命令是kafka/confluent工具的一部分,但我想这也可能发生在其他客户端上。

问题似乎是,我有一个复制因子为1的主题,它的分区没有指定的领导者。我找到这一点的唯一方法是更新<code>{kafka_client_dir}\libexec\config\tools-log4j。要在DEBUG级别记录的properties</code>文件:<code>log4j。rootLogger=DEBUG,stderr请注意,这是kafka/confluent工具的log4j配置文件-其他客户端的YMMV。我在Mac上运行它们。

完成后,我在输出中看到以下消息,提醒我注意ISR/offlineReplicas问题:

             [2019-01-28 11:41:54,290] DEBUG Updated cluster metadata version 2 to Cluster(id = 0B1zi_bbQVyrfKwqiDa2kw, 
    nodes = [
        brokerServer3:9092 (id: 3 rack: null), 
        brokerServer6:9092 (id: 6 rack: null), 
        brokerServer1:9092 (id: 1 rack: null), 
        brokerServer5:9092 (id: 5 rack: null), 
        brokerServer4:9092 (id: 4 rack: null)], partitions = [

            Partition(topic = myTopicWithReplicatinFactorOne, partition = 10, leader = 6, replicas = [6], isr = [6], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 11, leader = 1, replicas = [1], isr = [1], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 12, leader = none, replicas = [2], isr = [], offlineReplicas = [2]), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 13, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 14, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 2, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 3, leader = 5, replicas = [5], isr = [5], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 4, leader = 6, replicas = [6], isr = [6], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 5, leader = 1, replicas = [1], isr = [1], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 6, leader = none, replicas = [2], isr = [], offlineReplicas = [2]), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 7, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 8, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 9, leader = 5, replicas = [5], isr = [5], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 0, leader = none, replicas = [2], isr = [], offlineReplicas = [2]), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 1, leader = 3, replicas = [3], isr = [3], offlineReplicas = [])
        ], controller = brokerServer4:9092 (id: 4 rack: null)) (org.apache.kafka.clients.Metadata)

您可以在上面看到它说离线Replicas = [2] - 暗示了这个问题。此外,代理服务器 2 不在代理列表中。

最终,我重新启动了受影响的代理 (brokerServer2) 以使其恢复同步,一旦完成此操作,我再次使用命令行工具就没有问题了。可能有比重新启动代理更好的方法来解决此问题,但它最终解决了问题

 类似资料:
  • 我有一个微服务应用程序,它是使用camel kafka开发的,用于消费来自kafka的消息。我用4个并发实例运行这个服务,其中每个实例都有1个消费者,他们消费的主题有20个分区。 在维护期间(每天凌晨1-2点),应用程序将通过停止消费者来暂停消费。 当应用程序暂停消费时,kafka重新平衡发生,一些实例将在停止消费之前分配更多分区。 e、 由instanceA(partition1-5)使用的g分

  • 当我运行这个命令时,我得到了两个主题。我知道我创建了测试主题,但我看到了另一个名为“消费者偏移”的主题。从名称来看,这意味着它与消费者补偿有关,但它是如何使用的?

  • 我在一个线程中创建了一个Kafka consumer实例,作为构造函数的一部分,在thread inside run方法中,我确实调用了不同的web服务,为了保持调用的非阻塞性,我正在使用completable future。我的问题是,我无法通过调用thenApply方法并传递Kafka consumer实例来发出commit,因为这会给我一个错误,即Kafka consumer不是线程安全的。

  • 本文向大家介绍什么是kafka消费者组?相关面试题,主要包含被问及什么是kafka消费者组?时的应答技巧和注意事项,需要的朋友参考一下 答:消费者组的概念是Apache Kafka独有的。基本上,每个Kafka消费群体都由一个或多个共同消费一组订阅主题的消费者组成。

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我试图构建一个简单的spring boot Kafka Consumer来消费来自Kafka主题的消息,但是由于KafkaListener方法没有被触发,所以没有任何消息被消费。 Java类: start.java: kafkaConsumerConfig.java: