当前位置: 首页 > 知识库问答 >
问题:

加缪迁移-Kafka HDFS Connect不从设置的偏移量开始

向泽语
2023-03-14

我目前正在使用ConFluent HDFS Sink连接器(v4.0.0)来替换Camus。我们正在处理敏感数据,因此我们需要在切换到连接器期间保持偏移的一致性。

移交计划:

  1. 我们创建了hdfs接收器连接器,并订阅了一个主题,该主题将写入临时hdfs文件。这将创建一个名为connect-
  2. 的用户组
  3. 已使用DELETE请求停止连接器
  4. 使用/usr/bin/kafka使用者组脚本,我可以将连接器使用者组kafka主题分区的当前偏移量设置为所需的值(即Camus写入的上一个偏移量1)
  5. 当我重新启动hdfs接收器连接器时,它会继续读取上次提交的连接器偏移量,并忽略设置值。我希望hdfs文件名如下:hdfs_kafka_topic_name kafkapartition Camus_offset Camus_off set_plus_flush_size.format

我对汇合连接器行为的预期正确吗?

共有1个答案

洪和平
2023-03-14

当您重新启动这个连接器时,它将使用嵌入在最后一个写入hdfs的文件中的偏移量。它不会使用消费者组偏移量。之所以这样做,是因为它使用预写日志来实现向hdfs的一次性交付。

 类似资料:
  • 可以从输入主题的特定偏移量到结束偏移量进行Kafka流处理吗? 我有一个Kafka流应用程序消耗输入主题,但由于某种原因失败了。我修复了问题并再次启动它,但它从输入主题的最新偏移量开始消耗。我知道应用程序已处理的输入主题的偏移量。现在,我如何将输入主题从一个偏移量处理到另一个偏移量。我正在使用合流平台5.1.2。

  • 为什么实际主题中的偏移值与同一主题中的偏移值不同?PFB偏移位置以及使用的命令。 我错过了什么?

  • 问题内容: 使用javascript,我知道我的用户时区为UTC +3。 现在,我想用此知识创建DateTime对象: 我得到的回应是: 我究竟做错了什么?我该如何解决? 问题答案: 这个怎么样…

  • 在我的侦听器中,在使用消息后,如果发生任何异常,我将抛出一个异常。如果它成功了,那么我承认。但是,即使抛出异常,偏移量也不会后退。i、 e重试没有按预期进行。错误事件不会再次出现。 此外,我发现我没有消费所有预期的消息。我做错什么了吗? 监听器类 我正在做

  • 迁移cache分页 仓库地址: cache 安装 composer require illuminate/cache 暂时实现 redis方式 还需安装 composer require illuminate/redis composer require predis/predis //个人比较喜欢predis 启动predis function frameInitialized() {

  • 问题内容: 是否可以跳过X个第一行,并在一个查询中选择所有其他行?像那样: 它将选择:pqr,stu,vwx,yz 我尝试使用LIMIT和OFFSET完成此操作,但是问题是表是动态的,而且我不知道应该输入哪个LIMIT(我不知道表中有多少行)。 问题答案: 如果只需要最后N行,请尝试以下操作: 这会根据的顺序为您提供最后几条记录。 您可以使用自动递增的主键(希望有一个主键)来确定行的顺序(如果无法