当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect Sink(GCS)仅从最新偏移量读取,配置为从最早读取?

殳毅
2023-03-14

如上所述,我目前正在设置一个Kafka Connect Sink,将数据从Kafka传输到Google云存储中。

然而,一切都进展顺利——它只使用最新的可用偏移量。也就是说,一旦它开始运行,它只将新产生的消息下沉到GCS,而不是来自Kafka的已经存在的消息。我已经尝试删除kafka连接存储/偏移主题,创建一个新的连接器名称等。但是,它总是从最新的偏移量开始。

如果无论如何要为Kafka Connect GCS Sink配置最早的偏移量?我没有看到任何配置来处理这个问题

https://docs . confluent . io/current/connect/Kafka-connect-GCS/configuration _ options . html

https://docs.confluent.io/current/connect/references/allconfigs.html

我尝试删除任何kafka连接主题/文件存储,以及从新的连接器名称开始

我看到连接器启动后生成的Kafka Connect接收器消息。

我期望/需要消息从最早的可用偏移量接收,即。如果没有为连接器提交偏移量,则从最早的消息开始

共有1个答案

商麒
2023-03-14

当您第一次创建连接器时,默认情况下将采用最早的偏移量。您应该在连接工作日志中看到这一点:

[2019-08-05 23:31:35,405] INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
…

您可以通过在Worker配置中更改:consumer.auto.offset.reset来覆盖此设置。

当您删除连接器并重新创建它时,偏移量将被保留并重新使用。

如果使用新名称创建连接器,默认情况下,它将使用连接工作器(< code>earliest)中设置的偏移量。

 类似资料:
  • 我已经编写了一个Java Kafka消费者。我想确定如何明确确保一旦Kafka消费者启动,它只读取从那时起由制作人发送的消息,即它不应读取制作人已发送给Kafka的任何消息。有人能解释一下如何确保这一点吗 这是我使用的属性的片段 更新9月14日: 我使用的是以下属性,似乎消费者有时仍然从一开始就阅读,有人能告诉我现在出了什么问题吗? 我使用Kafka版本0.8.2

  • 相反,我需要做的是将更改为新的内容,然后它将从最早的偏移量恢复。 会不会有其他的犯罪行为? 更新 根据我的理解,这看起来像是每次auto commit enable为false时,它都将提交偏移量。这是Camel Kafka组件的一个特性,因为即使启用了自动提交,它也将在x条消息之后同步

  • 如有任何帮助,我们将不胜感激。

  • 问题内容: 嘿,我正在尝试打开文件,仅从偏移量读取一定长度!我阅读了以下主题: 如何使用Java中的文件中的特定行号读取特定行? 在那儿,它说在不读取之前就不可能读取某行,但是我想知道字节! 是否可以从已知偏移量读取某些字节? 问题答案: RandomAccessFile提供一个功能:

  • 问题内容: 我需要从GCS存储桶中读取文件。我知道我必须使用GCS API /客户端库,但找不到任何与之相关的示例。 我一直在GCS文档中参考此链接: GCS客户端库。但是不能真正地削弱。如果有人可以提供一个真正有帮助的例子。谢谢。 问题答案: 好。如果您只是想从GCS中读取文件,而不是从PCollection中读取文件,而是从常规文件中读取文件,并且在使用GCS Java客户端库时遇到问题,还可

  • 我已经开始让我的制作人向Kafka发送数据,也让我的消费者提取相同的数据。当我在ApacheNIFI中使用ConsumerKafka处理器(kafka版本1.0)时,我脑海中很少有与kafka consumer相关的查询。 Q.1)当我第一次启动ConsumeKafka处理器时,我如何从开始和当前消息中读取消息? 问题2)以及在Kafka消费者关闭的情况下,如何在最后一条消费信息之后阅读信息? 在