当前位置: 首页 > 知识库问答 >
问题:

使用Apache Beam从Google Cloud Datastore批量读取记录

杨轶
2023-03-14

我使用Apache Beam在Beam自己的io.gcp.Datastore.v1.datastoreioPython API的帮助下从Google Cloud Datastore读取数据。

我在谷歌云数据流上运行我的管道。

我想确保我的工人没有数据过载。

如何批量读取数据或使用其他机制确保员工不会一次性读取大量数据?

共有1个答案

曾嘉福
2023-03-14

Dataflow自动为您完成此操作。默认情况下,DataStoreio将文件分成64MB的卡盘。如果要将它们分成更小的部分,请使用初始化器上的num_splits参数来指定将每个文件分成多少部分。

 类似资料:
  • 我希望我的Spring批处理应用程序一次从数据库中读取50条记录,然后将这50条记录发送给处理器,然后发送给写入器。 有人可以告诉我如何做到这一点吗? 我尝试使用JdbcPagingItemReader并将pageSize设置为50,这样可以读取50条记录,但是rowMapper、处理器和编写器一次接收一条记录,而不是获得50条记录。 如何使处理器和写入器在dto中获得50条记录,而不是一次接收一

  • 我们可以使用以下命令轻松地从 Spark 中的 Hive 表中读取记录: 但是当我连接两个表时,例如: 如何从上面的连接查询中检索记录?

  • 我们计划有多个Kafka消费者(Java),它们具有相同的组ID..所以如果它从分配的分区中顺序读取,那么我们如何实现高吞吐量..例如,生产者每秒发布40条消息...消费者每秒处理1条消息...虽然我们可以有多个消费者,但不能有40条RT???如果我错了就纠正我... 在我们的情况下,使用者必须提交偏移量,只有在消息处理成功后...否则消息将被重新处理...有没有更好的解决方法???

  • Im使用存储库项目读取器从数据库读取事务,使用flatfileitem写入器处理和写入文件。我使用容错跳过任何记录,如果它是错误的。为了检查一条记录是否有错误,我们在处理器中进行一些验证并抛出自定义异常。 批处理配置 事务读取器bean 而writer只是一个flatfile编写器。问题是,在读取了很少事务之后,在我的例子中,在读取并处理了9个无效事务之后,批处理作业被卡住了。不知道为什么。 困在

  • 我有一个数据流工作,将单个文件分割成x个记录(表)。这些流在bigQuery没有问题。 不过,我发现没有办法在结果出来后执行管道中的另一个阶段。 举个例子 根据上述内容,我希望运行以下内容: 是有无论如何运行管道的另一个部分后,up到bigQuery或这是不可能的?提前感谢。

  • 当Kafka消费者从其分配的分区读取数据时,消费者提取线程是否使用任何特定的逻辑来从分区中获取数据?例如,读取器线程是否做了任何逻辑/努力来平等/一致地读取分配的分区?它是否从最滞后的分区获取更多记录?还是只是简单的循环式逻辑? 有关于消费逻辑的详细文档吗? 谢谢你。