我正在尝试使用Google Cloud Dataflow(Python SDK)将Google PubSub消息写入Google Cloud Storage。这些消息以json格式进入PubSub,我必须定义一个模式才能将它们写入Google Cloud Storage中的拼花格式。
根据其他用户的建议,我开始着手这项任务,特别是查看这个和这个来源。< br >第一个不是我想要做的,因为它将更改应用到json文件(它通过一个窗口将它们合并,将原始json放入“message”字段,并添加一个表示发布时间的时间戳)。< br >第二个源代码(这里是源代码)更适合这个用例。具体来说,模式是从BigQuery的表中提取的数据自动定义的,然后将结果以parquet格式写回Google云存储。< br >有人知道是否可以做同样的事情,更准确地说,通过从PubSub读取json消息,使用pyarrow自动定义模式吗?如果有可能,我该怎么做?
这是我到目前为止所做的。如果我尝试运行它,会生成一些拼花文件(它们包含我通过pyrow模式指定的列名,但它们没有值),并且从Dataflow控制台生成了几个错误(请参阅下面的一个示例)。此外,如果只有一个json文件到达PubSub(应该转换为只有一行的拼花文件),我不明白为什么会生成许多拼花文件(如果我让作业运行几分钟,则会超过10个)。
import argparse
import logging
import pyarrow
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run(input_topic, output_path, pipeline_args=None):
# TODO - Dynamic parquet_schema definition
# input_topic = known_args.input
# parquet_schema = get_parquet_schema(input_topic)
parquet_schema = pyarrow.schema(
[('Attr1', pyarrow.string()), ('Attr2', pyarrow.string()),
('Attr3', pyarrow.string()), ('Attr4', pyarrow.string()),
('Attr5', pyarrow.string()), ('Attr6', pyarrow.string())
]
)
# instantiate a pipeline with all the pipeline option
pipeline_options = PipelineOptions(pipeline_args, streaming=True)
# processing and structure of pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| 'Input: Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic)
| 'Output: Export to Parquet' >> beam.io.parquetio.WriteToParquet(
file_path_prefix=output_path,
schema=parquet_schema,
file_name_suffix='.parquet')
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('--input_topic',
help='input pubsub topic to read data.',)
parser.add_argument('--output_path',
help='gcs output location for parquet files.',)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.input_topic,
known_args.output_path,
pipeline_args,
)
这是数据流生成的错误:
Error message from worker:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1018: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n])
TypeError: byte indices must be integers or slices, not str
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute response = task()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction getattr(request, request_type), request.instruction_id)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 972, in process_bundle element.data)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n])
TypeError: byte indices must be integers or slices, not str [while running 'generatedPtransform-1004']
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1018: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/
filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n])
TypeError: byte indices must be integers or slices, not str
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute response = task()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction getattr(request, request_type), request.instruction_id)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 972, in process_bundle element.data)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/
filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n])
TypeError: byte indices must be integers or slices, not str [while running 'generatedPtransform-1004']
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)
很抱歉,这给了您一条如此丑陋的错误消息!当更多的转换具有类型支持时,这看起来正是我们能够及早发现的错误(请参见https://beam.apache.org/blog/python-typing/更多信息)。
ParquetIO接收器期望输入包含字典元素的PCollection,但PubSub源输出包含<code>字节</code>元素的PCcollection。您需要添加一个转换来解析有效负载<code>字节
(pipeline
| 'Input: Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic)
| '*** Parse JSON -> dict ***' >> beam.Map(json.loads)
| 'Output: Export to Parquet' >> beam.io.parquetio.WriteToParquet(
file_path_prefix=output_path,
schema=parquet_schema,
file_name_suffix='.parquet')
上面写着“Google Cloud Messaging(GCM)是一个免费服务”,但是为了使它能够运行,我需要在Google Cloud平台中创建一个项目,这需要花钱…那怎么免费呢?还是我错过了什么?
更新:已弃用GCM,请使用FCM 如何将新的Google云消息集成到PHP后端?
这样的消息在开发环境中很好,但是在生产环境中,我们不能在浏览器中向用户显示这一点。所以我想知道谷歌云存储中是否有定制这些消息和页面的规定。 提前感谢, 约杰什
当我运行Dataflow作业时,它会将我的小程序包(setup.py或requirements.txt)上传到Dataflow实例上运行。 但是数据流实例上实际运行的是什么?我最近收到了一个stacktrace: 但从理论上讲,如果我在做,这意味着我可能没有运行这个Python补丁?你能指出这些作业正在运行的docker图像吗,这样我就可以知道我使用的是哪一版本的Python,并确保我没有在这里找
我正在尝试使用谷歌云数据流将谷歌PubSub消息写入谷歌云存储。PubSub消息采用json格式,我要执行的唯一操作是从json到parquet文件的转换。
任何关于我如何做到这一点的想法或任何可以帮助我的例子。谢谢你。