我目前刚开始使用Python中的Apache Beam和Dataflow Runner。我对创建一个发布到Google Cloud PubSub的批处理管道感兴趣,我对Beam Python API进行了修补,并找到了一个解决方案。然而,在我的探索中,我遇到了一些有趣的问题,让我好奇。
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
options = PipelineOptions(flags=argv)
from datapipes.common.dataflow_utils import CsvFileSource
from datapipes.protos import proto_schemas_pb2
from google.protobuf.json_format import MessageToJson
with beam.Pipeline(options=options) as p:
normalized_data = (
p |
"Read CSV from GCS" >> beam.io.Read(CsvFileSource(
"gs://bucket/path/to/file.csv")) |
"Normalize to Proto Schema" >> beam.Map(
lambda data: MessageToJson(
proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
indent=0,
preserving_proto_field_name=True)
)
)
(normalized_data |
"Write to PubSub" >> beam.ParDo(
PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
)
在这里,我试图使发布服务器共享accrossdofn
。我尝试了以下方法。
a.在DoFn中初始化publisher
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
from google.cloud import pubsub_v1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
self.publisher = pubsub_v1.PublisherClient(batch_settings)
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
... ## same as 1
b.在DoFn外初始化Publisher并将其传递给DoFn
class PublishFn(beam.DoFn):
def __init__(self, publisher, topic_path):
self.publisher = publisher
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
.... ## same as 1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
with beam.Pipeline(options=options) as p:
... # same as 1
(normalized_data |
"Write to PubSub" >> beam.ParDo(
PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
)
File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
谢谢,你的帮助将不胜感激。
好了,Ankur已经解释了为什么会出现我的问题,并讨论了如何在DOFN上进行序列化。基于这些知识,我现在明白了在DOFN中有两种使自定义对象共享/可重用的解决方案:
>
使自定义对象可序列化:这允许对象在DoFn对象创建期间初始化/可用(在__init__
下)。这个对象必须是可序列化的,因为它将在管道提交过程中被序列化,在管道提交过程中创建DoFn对象(它调用__init__
)。你如何实现这一点在下面我的回答中得到了回答。另外,我发现这个需求实际上与[1][2]中的Beam文档相关联。
[1]https://beam.apache.org/documentation/programming-guide/#core-beam-transforms
[2]https://beam.apache.org/documentation/programming-guide/#beam-transforms编写用户代码的要求
PublisherClient
无法正确处理。这里有更多关于腌制的信息。在进程
方法中初始化PublisherClient
可避免对PublisherClient
进行酸洗。
如果目的是重用publisherclient
,我建议在process方法中初始化publisherclient
,并使用以下代码将其存储在self
中。
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
if not hasattr(self, 'publish'):
from google.cloud import pubsub_v1
self.publisher = pubsub_v1.PublisherClient()
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
问题内容: 我正在阅读angularjs.org上的开发人员指南的指令部分,以刷新我的知识并获得一些见解,并且我尝试运行示例之一,但指令ng- hide不适用于自定义指令。 这里是jsfiddle:http : //jsfiddle.net/D3Nsk/: 知道为什么会这样吗? 解 似乎标记上的变量dialogIsHidden已经在指令中引用了作用域变量,而不是控制器中的变量;鉴于该指令具有其自己
问题内容: 我正在使用ApplicationTestCase测试一个Android应用程序。我想模拟我的AsyncTasks之一(示例简化为显示问题): 因此,为了设置测试,我做了以下工作: 然后,实际测试如下: 但是运行时出现异常: 为什么模拟AsyncTask的技术不起作用? 请注意,在这种简单情况下,删除会导致问题消失,但是对于我的实际测试,我确实需要创建应用程序。 问题答案: AsyncT
问题内容: 为什么在生成输出时不能与DomDocument对象一起使用? 问题答案: 更新: 从PHP 5.4.1开始,您最终可以使用DOM对象。参见https://gist.github.com/2499678 这是一个错误: http://bugs.php.net/bug.php?id=48527
问题内容: 在下面的代码中,我试图使h1元素具有最高利润。当我在css中将位置设置为inline时,未显示上边距。但是,当我将其更改为inline-block时,它确实可以。我想知道是否有人可以解释为什么会这样。谢谢。 这是我的HTML: 这是CSS 问题答案: CSS2规范的9.2.4节规定: inline-block 此值使元素生成一个 内联级块容器 。内联块的内部被格式化为块框,元素本身被格
问题内容: 我已经在端口8080(默认)下启动并测试了Tomcat。现在,我将连接器端口更改为80,并重新启动了Tomcat,在最小的Debian 6.0安装中没有任何显示。现在,这里的窍门在哪里? 问题答案: 转到/ etc / default / tomcat6并更改为
问题内容: 为什么以下断言起作用: 但是这个断言给出了一个错误: 我能看到的唯一区别是使接口属性之一为可选()。似乎如果所有属性都不是可选的,那么我可以向接口声明一个部分对象,但是一旦任何接口属性都是可选的,我就不能再声明一个部分对象。这对我来说真的没有意义,我一直无法找到这种行为的解释。这里发生了什么? 对于上下文:我在尝试解决React的部分状态对象问题时遇到了此行为,但是TypeScript