当前位置: 首页 > 知识库问答 >
问题:

不同的主题使用相同的偏移值

朱渝
2023-03-14

我们的拓扑使用KafkaSpout从kafka主题获取消息。我们有约150个主题,包含约12个分区、8个storm执行器和2个storm节点上的任务。Storm版本1.0.5,Kafka经纪人版本10.0.2,Kafka客户端版本0.9.0.1。我们不会删除Kafka主题。

在某个时刻,我在worker中观察到大量重复的警告消息。日志

2018-05-29 14:36:57.928 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18][WARN]分区{host1:9092,topic=topic_1,Partition=10}获取了偏移量超出范围的请求:[9248]

2018-05-29 14:36:57.929o. a. s. k.KafkaUtils Thread-23-kafka-spout-执行程序[16 16][WARN]分区{host=host 2:9092,主题=topic_2,分区=0}获取到偏移超出范围的请求:[22650006]

2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16][WARN]分区{host=host3:9092,topic=topic_3,Partition=4}获取了偏移量超出范围的请求:[1011584]

2018-05-29 14:36:57.932o. a. s. k.kafkaUtils Thread-7-kafka-spout-执行程序[12 12][WARN]分区{host 1:9092,主题=topic4,分区=4}获取请求,偏移量超出范围:[9266]

2018-05-29 14:36:57.933 o.a.s.k.KafkaUtils Thread-7-kafka-Spoot-executor[12][WARN]分区{host=host2:9092,topic=topic5,Partition=4}获取了偏移量超出范围的请求:[9266]

2018-05-29 14:36:57.935o. a. s. k.KafkaUtils Thread-23-kafka-spout-执行程序[16 16][WARN]分区{host 1:9092,主题=topic6,分区=4}获取请求,偏移量超出范围:[1011584]

2018-05-29 14:36:57.936 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18][WARN]分区{host=host2:9092,topic=topic6,Partition=10}获取了偏移量超出范围的请求:[9248]

出于某种原因,相同的常量偏移量值用于不同主题的相同分区。

我启用了DEBUG模式,更精确地观察日志文件。

2018-05-29 14:37:03.573o. a. s. k.[DEBUG]将上次完成的偏移量(1572936)写入到ZK,用于分区{host=host 3:9092,主题=topic1,分区=8}拓扑:拓扑1

2018-05-29 14:37:03.577o. a. s. k.Thread-7-kafka-spout执行程序[12][DEBUG]将上次完成的偏移量(1572936)写入ZK,用于分区{host=host 1:9092,主题=topic2,分区=8}拓扑:拓扑1

2018-05-29 14:37:03.578 o.a.s.k.PartitionManager线程-7-kafka-Spoot-executor[12][DEBUG]将最后完成的偏移量(1572936)写入ZK,用于分区{host=host2:9092,topic=topic3,Partition=8}用于拓扑:topology1

2018-05-29 14:38:07.581 o.a.s.k.PartitionManager线程-23-kafka-spout-executor[16 16][DEBUG]将最后完成的偏移量(61292573)写入ZK,用于分区{host=host1:9092,topic=topic4,Partition=8}用于拓扑:topology1

2018-05-29 14:38:07.582 o.a.s.k.PartitionManager线程-23-kafka-spout-executor[16 16][DEBUG]将最后完成的偏移量(61292573)写入ZK,用于分区{host=host2:9092,topic=topic5,Partition=8}用于拓扑:topology1

2018-05-29 14:38:07.584o. a. s. k.线程23-kafka-spout执行程序[16 16][DEBUG]将上次完成的偏移量(61292573)写入ZK,用于分区{host=host 3:9092,主题=topic6,分区=8}拓扑:拓扑1

我注意到所有主题的一部分被分成了两个独立的组。每组由31个主题组成。每组中的所有主题对每个分区使用相同的偏移量值。然而,该值不是常数,在8个不同的值之间变化。这8个值中的每一个对于小组中的特定主题都是正确的。此外,随着时间的推移,这些价值观都在不断增长,所有的主题都在同步更新。每组的大多数主题(62个主题中的55个)都有相应的“偏移量超出或范围”警告消息,但具有常量值。其他7个主题在没有警告消息的情况下继续正常工作,但它们的偏移量值也在变化。

我浏览了storm kafka的源代码,注意到UseStartOffsetTimeifOffsetAutoFrange标志在我们的例子中不起作用,因为我们没有失败的元组,而且kafka偏移量小于\u emittedToOffset。因此,相同的警告消息会被一次又一次地记录下来。

    } catch (TopicOffsetOutOfRangeException e) {
        offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
        // fetch failed, so don't update the fetch metrics

        //fix bug [STORM-643] : remove outdated failed offsets
        if (!processingNewTuples) {
            // For the case of EarliestTime it would be better to discard
            // all the failed offsets, that are earlier than actual EarliestTime
            // offset, since they are anyway not there.
            // These calls to broker API will be then saved.
            Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);

            // Omitted messages have not been acked and may be lost
            if (null != omitted) {
                _lostMessageCount.incrBy(omitted.size());
            }

            _pending.headMap(offset).clear();

            LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted);
        }

        if (offset > _emittedToOffset) {
            _lostMessageCount.incrBy(offset - _emittedToOffset);
            _emittedToOffset = offset;
            LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
        }

        return;
    }

然而,我不明白为什么_emittedToOffset在不同的主题中得到相同的值。你可能知道为什么会发生这种事吗?

共有1个答案

艾成益
2023-03-14

storm kafka源代码中有一个bug,它发生在kafka代理失败时。下面是相应的JIRA票证和带补丁的pull请求。

 类似资料:
  • 为什么实际主题中的偏移值与同一主题中的偏移值不同?PFB偏移位置以及使用的命令。 我错过了什么?

  • 我有一个Kafka的话题,我正在听。然后将消息内容写入websocket通道,在该通道中我有一个订阅了该通道的SockJS客户机。这很管用。然后我创建了一个新的主题,然后添加了第二个KafKalistener。但是,当调用secong侦听器时,我看到它正在尝试处理/读取与第一个KafkaListener和主题相对应的有效负载,由于它没有被配置为这样做,因此会引发一个MessageConversio

  • 我使用的是“亚洲/曼谷”区域id。该偏移是从格林尼治时间UTC 07:00开始的。 但是当我做了下面的事情时,设置为“01/01/1900 7:00:00.000”时不是7:00 结果就是 周一一月一日07:00:00 ICT 1900 -402 周一一月一日06:00:00 ICT 1900 -420 我想知道偏移量是否在1900年1月1日早上7点左右发生了变化,所以我在维基百科上查了一下。ht

  • 我有一个应用程序需要收听多个不同的主题;每个主题都有关于如何处理消息的单独逻辑。我曾想过为每个KafkaStreams实例使用相同的kafka属性,但我得到了如下所示的错误。 错误 流处理应用程序的标识符。在Kafka集群中必须是唯一的。它用作1)默认的客户端ID前缀,2)用于成员资格管理的组ID,3)变更日志主题前缀。 问题 此错误意味着什么,以及导致此错误的原因。 假设您可以有应用程序的多个实

  • 我有一个问题,假设有一个TOPIC T1,有两个消费者C1和C2属于两个不同的组,电流偏移量是0.我们知道Kafka维护消费者的偏移量。因此,如果 C1 使用消息并且 Offset 变为 1,那么如果 C2 使用消息,它将从 1 偏移量开始,还是从 0 偏移量开始使用消息,会发生什么情况?表示两个不同的消费群体将如何维持抵消? 谢啦

  • 我有一个单一的Kafka消费者,它连接到一个有3个分区的主题。一旦我从Kafka那里得到一张唱片,我就想捕捉偏移量和分区。在重新启动时,我希望从上次读取的偏移量恢复使用者的位置 摘自Kafka文档: 每个记录都有自己的偏移量,因此要管理自己的偏移量,只需执行以下操作: 配置enable.auto.commit=false 下面是我的示例代码: 这是正确的做法吗?有没有更好的办法?