当前位置: 首页 > 知识库问答 >
问题:

Apache beam KafkaIO对外部数据存储的偏移管理

马弘和
2023-03-14

我正在尝试阅读使用apache Beam上的KafkaIO的多个kafka代理。偏移量管理的默认选项是kafka分区本身(不再使用来自kafka>0.9的zookeper)。使用此设置,当我重新启动作业/管道时,会出现重复和丢失记录的问题。

从我读到的内容来看,处理这一点的最佳方法是管理对外部数据存储的偏移量。用当前版本的apache beam和Kafkaio是否可以做到这一点?我现在使用的是2.2.0版本。

而且,读完Kafka的文章后,我会把它写给BigQuery。在KafkaIO中是否有一个设置,只有在我将消息插入BigQuery之后,我才能设置提交的消息?我现在只能找到自动提交安装程序

共有1个答案

包翔
2023-03-14

在Dataflow中,您可以更新作业,而不是从头重新启动。新作业将从上次检查点状态恢复,确保只进行一次处理。这也适用于KafkaIO源代码。Kafka使用者配置中的auto-commit选项有帮助,但对于Dataflow内部状态来说,它不是原子的,这意味着重新启动的作业可能会有一小部分重复或丢失的消息。

 类似资料:
  • 我对使用Kafka和动物园管理员时偏移量的存储位置有点困惑。在某些情况下,偏移量似乎存储在动物园管理员中,而在其他情况下,它们存储在Kafka中。 什么决定了偏移量是存储在Kafka中还是存储在Zookeeper中?有哪些利弊? 注意:当然,我也可以将偏移量存储在不同的数据存储中,但这不是这篇文章的内容。 有关我的设置的更多详细信息: 我运行这些版本:KAFKA_VERSION=“0.10.1.0

  • 本文向大家介绍Android App将数据写入内部存储和外部存储的示例,包括了Android App将数据写入内部存储和外部存储的示例的使用技巧和注意事项,需要的朋友参考一下 File存储(内部存储) 一旦程序在设备安装后,data/data/包名/ 即为内部存储空间,对外保密。 Context提供了2个方法来打开输入、输出流 FileInputStream openFileInput(Strin

  • 问题内容: 在MySQL中存储时区偏移的正确数据类型/结构是哪一种?我只想存储数字值(城市和国家显然存储在其他列中)。 例子: -5:00瓜亚基尔(ECU) -4:30委内瑞拉加拉加斯 0:00一些城市 2:00德国波恩 问题答案: 您应该使用。这是完成任务的正确数据类型:您已设置格式并可以进行计算。此外,根据文档,还应该将其用作两个时刻之间差异的结果,这实际上就是时区。 从文档: MySQL检索

  • 将现有存储库数据库迁移到其他数据库可能包括迁移到同一服务器中的其他数据库,或迁移到不同操作系统上的数据库(例如,从 Windows 到 macOS)。 如果你想迁移你的存储库数据库,请按照下列步骤操作: 通过运行命令,或使用通知区域或菜单栏中的图标来停止 Navicat Monitor 服务。 在你的数据库管理工具中,将当前的存储库数据库(所有表的结构和数据)复制到新的存储库数据库。 启动 Nav

  • 问题内容: 我对使用Kafka和Zookeeper时在哪里存储偏移量感到困惑。在某些情况下,偏移似乎存储在Zookeeper中,而在其他情况下,偏移存储在Kafka中。 是什么决定偏移量存储在Kafka还是Zookeeper中?优点和缺点是什么? 注意:当然,我也可以将偏移量单独存储在其他数据存储区中,但这并不是本文的内容。 有关我的设置的更多详细信息: 我运行以下版本:KAFKA_VERSION

  • 问题内容: 我正在使用带有Java的Google AppEngine。当我使用某些数据存储功能时,出现一条错误消息: 我不知道这意味着什么,如何解决它,或者在哪里可以找到有关此错误的文档。谁能帮我?我使用的代码是: 我相信“ ”是指“拥有的,一对多”的关系。 问题答案: 一个持久对象只能由一个PersistenceManager“管理”。在DataNucleus中,这由“ ObjectManage