当前位置: 首页 > 知识库问答 >
问题:

Apache beam列表到PCollection

笪欣嘉
2023-03-14

我的输入是一个json列表,我希望有一个多元素pCollection。这是我的代码

def parse_json(data):
    import json
    for i in json.loads(data):
        return i
data = (p
    | "Read text" >> beam.io.textio.ReadFromText(f'gs://{bucket_name}/not_processed/2020-06-08T23:59:59.999Z__rms004_m1__not_sent_msg.txt')
    | "Parse json" >> beam.Map(parse_json))

问题是,当列表由2个元素组成时,我只得到列表的第一个元素。

我如何做到这一点?

共有1个答案

松国兴
2023-03-14

我发现了。

Apache Beam中有一个名为ParDo的函数就是为了这个。

def parse_json(data):
    import json
    return json.loads(data)

data = (p
    | "Read text" >> beam.io.textio.ReadFromText(f'gs://{bucket_name}/not_processed/2020-06-08T23:59:59.999Z__rms004_m1__not_sent_msg.txt')
    | "Parse json" >> beam.ParDo(parse_json))
 类似资料:
  • 我有一个数据流工作,将单个文件分割成x个记录(表)。这些流在bigQuery没有问题。 不过,我发现没有办法在结果出来后执行管道中的另一个阶段。 举个例子 根据上述内容,我希望运行以下内容: 是有无论如何运行管道的另一个部分后,up到bigQuery或这是不可能的?提前感谢。

  • 我的DTO中有一个字符串列表,我想把它映射成一个对象列表,在映射器中我使用服务通过这个字符串获取对象,但我有以下错误 考虑声明/实现一个映射方法:“java.util.list map(java.util.list value)”。

  • 我使用的是mapstruct 1.4.2.final。我有一个这样的问题: Business1 id有许多Business2 id关系。我想像RelationDTO一样使用DTO来记录。 谢谢你的留言。

  • 问题内容: 将列表列表转换为pandas数据框很容易: 但是,如何将df重新变成列表列表? 问题答案: 您可以访问基础数组并调用其方法:

  • 我正在用Play Framework和Scala创建一个后端API。我想将传入的请求映射到scala对象。该对象的实例变量之一是通道列表。以下是我目前拥有的: 接受请求并尝试将其映射到用户的Controller方法: 用户案例类: 理想情况下,我希望作为一个用户能够接受这一点: 并从中创建用户。

  • 我如何将一个简单的列表转换成一个Numpy数组?这些行是单独的子列表,每行包含子列表中的元素。