当前位置: 首页 > 知识库问答 >
问题:

Apache Beam管道中的组元素

印辉
2023-03-14

我有一个管道可以解析AVRO文件中的记录。

我需要将传入的记录分成500个项目的块,以便调用一个同时接受多个输入的API。

有没有办法用PythonSDK做到这一点?

共有1个答案

雷硕
2023-03-14

我假设您是指Batch用例。您有几个选项:

如果您的PCollection足够大,并且您对包的大小有一定的灵活性,则可以在按随机/循环顺序为元素分配键后使用GroupByKey变换。e、 g.:

my_collection = p | ReadRecordsFromAvro()

element_bundles = (my_collection 
                     # Choose a number of keys that works for you (I chose 50 here)
                   | 'AddKeys' >> beam.Map(lambda x: (randint(0, 50), x))
                   | 'MakeBundles' >> beam.GroupByKey()
                   | 'DropKeys' >> beam.Map(lambda (k, bundle): bundle)
                   | beam.ParDo(ProcessBundlesDoFn()))

其中ProcessBundlesDoFn是这样的:

class ProcessBundlesDoFn(beam.DoFn):
  def process(self, bundle):
    while bundle.has_next():
      # Fetch in batches of 500 until you're done
      result = fetch_n_elements(bundle, 500)
      yield result

如果您需要所有捆绑包都包含500个元素,那么您可能需要:

  1. 计算PCollection中的元素数

希望这有帮助。

 类似资料:
  • 我有一个Spark RDD,其中每个元素都是形式的元组。我想使用方法将输入传递给外部可执行文件并生成形式的新RDD。我稍后需要键进行关联。 下面是使用火花壳的示例: 提前谢谢。

  • 我有一个数据流工作,将单个文件分割成x个记录(表)。这些流在bigQuery没有问题。 不过,我发现没有办法在结果出来后执行管道中的另一个阶段。 举个例子 根据上述内容,我希望运行以下内容: 是有无论如何运行管道的另一个部分后,up到bigQuery或这是不可能的?提前感谢。

  • 我不确定以下属性如何影响5阶段MIPS设计(IF、ID、EX、MEM、WB)的管道执行。我只需要一些清理。 只有1个内存端口 没有数据备份。 分支摊位直到*阶段结束 1个内存端口是否意味着我们在读/写mem时无法提取或写入(即lw上的mem阶段,您无法输入IF或其他mem的软件)?如果没有转发,这是否意味着一条指令要在它所依赖的前一条指令的WB阶段之后或之后才能进入ID阶段?Idk分支失速的含义

  • 本文向大家介绍Markdown 管道中的单元格内容,包括了Markdown 管道中的单元格内容的使用技巧和注意事项,需要的朋友参考一下 示例 如果要|在单元格内容中使用竖线字符(),则需要使用反斜杠对其进行转义。 结果如下表所示: 柱 柱 | 细胞 | 细胞

  • 下面我有以下数据。 所以,我不知道为什么UDF可以使用int而不能使用CharArray。此外,我觉得可能有一种方法可以做到这一点,而不使用UDF..但不确定从哪里开始。对这里可能发生的事情有什么建议吗?

  • 问题内容: 在jenkinsfile,我已经通过指定的文件夹名 SparseCheckoutPaths 我想结帐。但是我却得到了整个分支结帐。 问题答案: 这是我自己的问题的答案。关于它如何工作的一些背景知识,有一个名为 sparsecheckout的 git客户端标志/配置,负责这种签出。此外,还需要一个 稀疏签出的 命名文件。 我的问题是 Jenkinsfile 的语法,正确的语法如下: 有关