当前位置: 首页 > 知识库问答 >
问题:

从apache beam pcollection返回什么内容以写入bigquery

卜飞鸣
2023-03-14

我正在阅读beam文档和一些stackoverflow问题/答案,以便了解如何向BigQuery编写pubsub消息。到目前为止,我已经有了获取protobuf消息并能够解码消息的工作示例。代码如下所示

(p
 | 'ReadData' >> apache_beam.io.ReadFromPubSub(topic=known_args.input_topic, with_attributes=True)
 | 'ParsePubsubMessage' >> apache_beam.Map(parse_pubsubmessage)
 )
class DecodedPubsubMessage:
    def __init__(self, attr, event):
        self.attribute_one = attr['attribute_one']
        self.attribute_two = attr['attribute_two']

        self.order_id = event.order.order_id
        self.sku = event.item.item_id
        self.triggered_at = event.timestamp
        self.status = event.order.status

共有1个答案

漆雕成弘
2023-03-14

您的类必须从DoFn继承并重载“process”方法,而不是在init上执行转换

在转换之后,您可以使用“return[obj]”或“yield obj”返回所需的输出PCollection

 类似资料:
  • 问题内容: 我经常看到这样的代码(C,C ++,有时甚至是Java): 我看不到这些括号有任何好处。所以我的问题是,程序员是否假定函数是某种以返回值作为参数的函数,或者确实有这些括号有意义的情况吗? 我了解这里已经提出了类似的问题,但这仅与ANSI C有关。我想知道是否存在某些特定于C ++或Java的方面尚未得到解答。 问题答案: 关于C 括号放在有表达式的位置,并且希望返回值是表达式的值。 即

  • 问题内容: 我正在使用RESTlet,并且已经创建了资源。我通过覆盖方法处理POST 。 客户端应该向我发送一些数据,然后将其存储到DB,将响应设置为201(SUCCESS_CREATED),我需要向客户端返回一些数据,但是返回类型为。 就我而言,我需要返回一些标识符,以便客户端可以访问该资源。 例如,如果我有一个带有URL的资源,并且客户端发送了POST请求,则在DB中添加一个新行,其地址应为。

  • 问题内容: 我想知道在不需要时跳过的方式是否不好。 例: 在这两种情况下,当condition为false时,函数将以返回。 问题答案: 就像您说的那样,几乎不需要。 但是,您应该考虑到使用 明确 的代码,代码的 意图 会更加清晰。切记:一段代码也需要人类可读,而明确表示通常会有所帮助。

  • 我有两张桌子锻炼和练习。我正在尝试使用与我们点击的锻炼匹配的workout_id获取所有锻炼。我做了一个内部联接查询,但它似乎没有返回任何东西。我的查询有问题吗? 我正在使用SQLite创建我的数据库。我已经检查了,以确保练习表中有练习,并且它们有一个workout_id。 2)回到我的WorkoutProvider类中,我将selectionArgs设置为: 并把它传递到我的RawQuery中。

  • 问题内容: 除了使用关键字调用构造函数时,Java语言中的语句可以返回值的确切情况是什么? 例: 如果我没记错的话,如果它是一个非函数原语,将被返回。否则返回。这个对吗? 换句话说,什么值可以引起? 问题答案: 确切的条件在内部属性上进行了描述,该属性由操作员使用: 来自ECMA-262第3条。版本规格: 13.2.2 调用对象的属性时,将执行以下步骤: 创建一个新的本机ECMAScript对象。

  • 问题内容: 有人可以提供goroutine返回的澄清值。从goroutine返回的值是否已存入抵押。 例如: 我们是否应该避免在go例程中避免返回值? 问题答案: 快速查看一下组装输出显示 该函数确实将其结果存储到堆栈中 因此,当从goroutine调用它时,它会将结果存储到堆栈中。但是,这是一个新的堆栈,当goroutine结束时,该堆栈会被破坏,因此无法获取返回值。 但是,无法检索这些结果。