我想使用Dataflow将数据从发布/订阅移到GCS。因此,基本上我希望Dataflow在固定的时间量(例如15分钟)内累积一些消息,然后在经过该时间量后将这些数据作为文本文件写入GCS。
我的最终目标是创建一个自定义管道,因此“ Pub / Sub to Cloud
Storage”模板对我来说还不够,而且我完全不了解Java,这使我开始使用Python进行调整。
这是到目前为止我所获得的(Apache Beam Python SDK 2.10.0):
import apache_beam as beam
TOPIC_PATH="projects/<my-project>/topics/<my-topic>"
def CombineFn(e):
return "\n".join(e)
o = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=o)
data = ( p | "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
| "Window" >> beam.WindowInto(beam.window.FixedWindows(30))
| "Combine" >> beam.transforms.core.CombineGlobally(CombineFn).without_defaults()
| "Output" >> beam.io.WriteToText("<GCS path or local path>"))
res = p.run()
res.wait_until_finish()
我在本地环境中运行该程序没有问题。
python main.py
它会在本地运行,但会从指定的Pub / Sub主题读取,并且每隔30秒就会按预期方式写入指定的GCS路径。
但是现在的问题是,当我在Google Cloud Platform(即Cloud Dataflow)上运行它时,它不断发出神秘的异常。
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1096: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 148, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 183, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in do_instruction
request.instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 272, in process_bundle
bundle_processor.process_bundle(instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 494, in process_bundle
op.finish()
File "apache_beam/runners/worker/operations.py", line 506, in apache_beam.runners.worker.operations.DoOperation.finish
def finish(self):
File "apache_beam/runners/worker/operations.py", line 507, in apache_beam.runners.worker.operations.DoOperation.finish
with self.scoped_finish_state:
File "apache_beam/runners/worker/operations.py", line 508, in apache_beam.runners.worker.operations.DoOperation.finish
self.dofn_runner.finish()
File "apache_beam/runners/common.py", line 703, in apache_beam.runners.common.DoFnRunner.finish
self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
File "apache_beam/runners/common.py", line 697, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 722, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 695, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
bundle_method()
File "apache_beam/runners/common.py", line 361, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
def invoke_finish_bundle(self):
File "apache_beam/runners/common.py", line 364, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
self.output_processor.finish_bundle_outputs(
File "apache_beam/runners/common.py", line 832, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 87, in apache_beam.runners.worker.operations.ConsumerSet.receive
self.update_counters_start(windowed_value)
File "apache_beam/runners/worker/operations.py", line 93, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
self.opcounter.update_from(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 195, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
self.do_sample(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 213, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
self.coder_impl.get_estimated_size_and_observables(windowed_value))
File "apache_beam/coders/coder_impl.py", line 953, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
def get_estimated_size_and_observables(self, value, nested=False):
File "apache_beam/coders/coder_impl.py", line 969, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
self._windows_coder.estimate_size(value.windows, nested=True))
File "apache_beam/coders/coder_impl.py", line 758, in apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
self.get_estimated_size_and_observables(value))
File "apache_beam/coders/coder_impl.py", line 772, in apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
self._elem_coder.get_estimated_size_and_observables(
File "apache_beam/coders/coder_impl.py", line 134, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
return self.estimate_size(value, nested), []
File "apache_beam/coders/coder_impl.py", line 458, in apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
typed_value = value
TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'generatedPtransform-1090']
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:280)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:84)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:130)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1233)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:144)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:972)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1096: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 148, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 183, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in do_instruction
request.instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 272, in process_bundle
bundle_processor.process_bundle(instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 494, in process_bundle
op.finish()
File "apache_beam/runners/worker/operations.py", line 506, in apache_beam.runners.worker.operations.DoOperation.finish
def finish(self):
File "apache_beam/runners/worker/operations.py", line 507, in apache_beam.runners.worker.operations.DoOperation.finish
with self.scoped_finish_state:
File "apache_beam/runners/worker/operations.py", line 508, in apache_beam.runners.worker.operations.DoOperation.finish
self.dofn_runner.finish()
File "apache_beam/runners/common.py", line 703, in apache_beam.runners.common.DoFnRunner.finish
self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
File "apache_beam/runners/common.py", line 697, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 722, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 695, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
bundle_method()
File "apache_beam/runners/common.py", line 361, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
def invoke_finish_bundle(self):
File "apache_beam/runners/common.py", line 364, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
self.output_processor.finish_bundle_outputs(
File "apache_beam/runners/common.py", line 832, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 87, in apache_beam.runners.worker.operations.ConsumerSet.receive
self.update_counters_start(windowed_value)
File "apache_beam/runners/worker/operations.py", line 93, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
self.opcounter.update_from(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 195, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
self.do_sample(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 213, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
self.coder_impl.get_estimated_size_and_observables(windowed_value))
File "apache_beam/coders/coder_impl.py", line 953, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
def get_estimated_size_and_observables(self, value, nested=False):
File "apache_beam/coders/coder_impl.py", line 969, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
self._windows_coder.estimate_size(value.windows, nested=True))
File "apache_beam/coders/coder_impl.py", line 758, in apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
self.get_estimated_size_and_observables(value))
File "apache_beam/coders/coder_impl.py", line 772, in apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
self._elem_coder.get_estimated_size_and_observables(
File "apache_beam/coders/coder_impl.py", line 134, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
return self.estimate_size(value, nested), []
File "apache_beam/coders/coder_impl.py", line 458, in apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
typed_value = value
TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'generatedPtransform-1090']
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
每次尝试写入GCS时,都会以非阻塞方式显示上面的异常。这导致我遇到一种情况,当它尝试输出时,肯定会生成一个新的文本文件,但文本内容始终与第一个窗口输出相同。这显然是不必要的。
异常是如此深深地嵌套在堆栈跟踪中,以至于很难猜测根本原因是什么,我不知道为什么它在DirectRunner上运行得很好,而在DataflowRunner上却运行不佳。似乎说在管道中的某处,全局窗口值被转换为非全局窗口值,尽管我在管道的第二阶段使用了非全局窗口变换。添加自定义触发器没有帮助。
我遇到了同样的错误,找到了解决方法,但没有解决方法:
TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'test-file-out/Write/WriteImpl/WriteBundles']
使用DirectRunner
和在数据流上本地运行DataflowRunner
。
恢复为apache-beam [gcp] == 2.9.0可使我的管道按预期运行。
我想发布消息到一个发布/子主题与一些属性感谢数据流作业在批处理模式。 它与@ankur解决方案一起工作:https://stackoverflow.com/a/55824287/9455637 但我认为使用共享的pub/sub客户端会更有效:https://stackoverflow.com/a/55833997/9455637 但是发生了一个错误: null
我正在构建一个事件驱动的微服务架构,它应该是云不可知的(尽可能多)<由于这最初是在GCP中进行的,我不想在配置和所有这些方面花费太长时间,我打算直接将GCP的发布/订阅用于事件队列,并在稍后处理其他云实现,但后来我遇到了Spring云数据流,这看起来很好,因为这些是Spring Boot微服务,我需要一种方法来协调它们 Spring Cloud数据流是否支持Pub Sub作为事件队列? 在配置和设
我曾经使用过SpringCloudDataFlow、rabbitmq和kafka,但我想知道是否可以使用GooglePub/sub安装scdf。 我不想创建一个流(新的应用程序spring cloud stream),将源或接收器连接到gcp,我希望google pub/sub over spring cloud data flow server用作中间消息代理。 有什么建议吗?
但理想情况下,我只希望有一些不处理插入错误并记录它的配置,而只是崩溃作业或至少停止摄入。
当我通过命令行运行Beam管道时,使用direct runner或dataflow runner,它工作得很好。。。 例子: 但是当我尝试使用空气流时,我有两个选项,bash操作符或python操作符。 使用bash操作符会成功,但会限制我使用气流功能的能力。 但是我想做的是作为python操作员运行它。所以我将模块导入到airflow dg文件中,然后作为python操作符运行它。 如果我使用本
发布订阅模式 核心点 Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作主题(topic)。 主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscriber) 从主题订阅消息。 主题使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。 pub/sub解决了什么样的问题? 耗时的问题,比如上传,格式转换、计算等其他耗