当前位置: 首页 > 知识库问答 >
问题:

为什么自定义Python对象不能与ParDo FN一起使用?

田冥夜
2023-03-14

我目前刚开始使用Python中的Apache Beam和Dataflow Runner。我对创建一个发布到Google Cloud PubSub的批处理管道感兴趣,我对Beam Python API进行了修补,并找到了一个解决方案。然而,在我的探索中,我遇到了一些有趣的问题,让我好奇。

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        from google.cloud import pubsub_v1
        publisher = pubsub_v1.PublisherClient()
        future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    options = PipelineOptions(flags=argv)

    from datapipes.common.dataflow_utils import CsvFileSource
    from datapipes.protos import proto_schemas_pb2
    from google.protobuf.json_format import MessageToJson

    with beam.Pipeline(options=options) as p:
        normalized_data = (
                p |
                "Read CSV from GCS" >> beam.io.Read(CsvFileSource(
                    "gs://bucket/path/to/file.csv")) |
                "Normalize to Proto Schema" >> beam.Map(
                        lambda data: MessageToJson(
                            proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
                            indent=0,
                            preserving_proto_field_name=True)
                    )
        )
        (normalized_data |
            "Write to PubSub" >> beam.ParDo(
                    PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
            )

在这里,我试图使发布服务器共享accrossdofn。我尝试了以下方法。

a.在DoFn中初始化publisher

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        from google.cloud import pubsub_v1

        batch_settings = pubsub_v1.types.BatchSettings(
             max_bytes=1024,  # One kilobyte
             max_latency=1,  # One second
         )
        self.publisher = pubsub_v1.PublisherClient(batch_settings)
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()

def run_gcs_to_pubsub(argv):
    ... ## same as 1

b.在DoFn外初始化Publisher并将其传递给DoFn

class PublishFn(beam.DoFn):
    def __init__(self, publisher, topic_path):
        self.publisher = publisher
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    .... ## same as 1

    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1024,  # One kilobyte
        max_latency=1,  # One second
    )
    publisher = pubsub_v1.PublisherClient(batch_settings)

    with beam.Pipeline(options=options) as p:
        ... # same as 1
        (normalized_data | 
            "Write to PubSub" >> beam.ParDo(
                PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
        )
  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

谢谢,你的帮助将不胜感激。

好了,Ankur已经解释了为什么会出现我的问题,并讨论了如何在DOFN上进行序列化。基于这些知识,我现在明白了在DOFN中有两种使自定义对象共享/可重用的解决方案:

>

  • 使自定义对象可序列化:这允许对象在DoFn对象创建期间初始化/可用(在__init__下)。这个对象必须是可序列化的,因为它将在管道提交过程中被序列化,在管道提交过程中创建DoFn对象(它调用__init__)。你如何实现这一点在下面我的回答中得到了回答。另外,我发现这个需求实际上与[1][2]中的Beam文档相关联。

    [1]https://beam.apache.org/documentation/programming-guide/#core-beam-transforms

    [2]https://beam.apache.org/documentation/programming-guide/#beam-transforms编写用户代码的要求

  • 共有1个答案

    慕容宏邈
    2023-03-14

    PublisherClient无法正确处理。这里有更多关于腌制的信息。在进程方法中初始化PublisherClient可避免对PublisherClient进行酸洗。

    如果目的是重用publisherclient,我建议在process方法中初始化publisherclient,并使用以下代码将其存储在self中。

    class PublishFn(beam.DoFn):
        def __init__(self, topic_path):
            self.topic_path = topic_path
            super(self.__class__, self).__init__()
    
        def process(self, element, **kwargs):
            if not hasattr(self, 'publish'):
                from google.cloud import pubsub_v1
                self.publisher = pubsub_v1.PublisherClient()
            future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
            return future.result()
    
     类似资料:
    • 问题内容: 我正在阅读angularjs.org上的开发人员指南的指令部分,以刷新我的知识并获得一些见解,并且我尝试运行示例之一,但指令ng- hide不适用于自定义指令。 这里是jsfiddle:http : //jsfiddle.net/D3Nsk/: 知道为什么会这样吗? 解 似乎标记上的变量dialogIsHidden已经在指令中引用了作用域变量,而不是控制器中的变量;鉴于该指令具有其自己

    • 问题内容: 我正在使用ApplicationTestCase测试一个Android应用程序。我想模拟我的AsyncTasks之一(示例简化为显示问题): 因此,为了设置测试,我做了以下工作: 然后,实际测试如下: 但是运行时出现异常: 为什么模拟AsyncTask的技术不起作用? 请注意,在这种简单情况下,删除会导致问题消失,但是对于我的实际测试,我确实需要创建应用程序。 问题答案: AsyncT

    • 问题内容: 为什么在生成输出时不能与DomDocument对象一起使用? 问题答案: 更新: 从PHP 5.4.1开始,您最终可以使用DOM对象。参见https://gist.github.com/2499678 这是一个错误: http://bugs.php.net/bug.php?id=48527

    • 问题内容: 在下面的代码中,我试图使h1元素具有最高利润。当我在css中将位置设置为inline时,未显示上边距。但是,当我将其更改为inline-block时,它确实可以。我想知道是否有人可以解释为什么会这样。谢谢。 这是我的HTML: 这是CSS 问题答案: CSS2规范的9.2.4节规定: inline-block 此值使元素生成一个 内联级块容器 。内联块的内部被格式化为块框,元素本身被格

    • 问题内容: 我已经在端口8080(默认)下启动并测试了Tomcat。现在,我将连接器端口更改为80,并重新启动了Tomcat,在最小的Debian 6.0安装中没有任何显示。现在,这里的窍门在哪里? 问题答案: 转到/ etc / default / tomcat6并更改为

    • 问题内容: 为什么以下断言起作用: 但是这个断言给出了一个错误: 我能看到的唯一区别是使接口属性之一为可选()。似乎如果所有属性都不是可选的,那么我可以向接口声明一个部分对象,但是一旦任何接口属性都是可选的,我就不能再声明一个部分对象。这对我来说真的没有意义,我一直无法找到这种行为的解释。这里发生了什么? 对于上下文:我在尝试解决React的部分状态对象问题时遇到了此行为,但是TypeScript