当前位置: 首页 > 知识库问答 >
问题:

谷歌云数据流 - 来自Pub消息的Pyarrow架构

牟嘉
2023-03-14

我正在尝试使用Google Cloud Dataflow(Python SDK)将Google PubSub消息写入Google Cloud Storage。这些消息以json格式进入PubSub,我必须定义一个模式才能将它们写入Google Cloud Storage中的拼花格式。

根据其他用户的建议,我开始着手这项任务,特别是查看这个和这个来源。< br >第一个不是我想要做的,因为它将更改应用到json文件(它通过一个窗口将它们合并,将原始json放入“message”字段,并添加一个表示发布时间的时间戳)。< br >第二个源代码(这里是源代码)更适合这个用例。具体来说,模式是从BigQuery的表中提取的数据自动定义的,然后将结果以parquet格式写回Google云存储。< br >有人知道是否可以做同样的事情,更准确地说,通过从PubSub读取json消息,使用pyarrow自动定义模式吗?如果有可能,我该怎么做?

这是我到目前为止所做的。如果我尝试运行它,会生成一些拼花文件(它们包含我通过pyrow模式指定的列名,但它们没有值),并且从Dataflow控制台生成了几个错误(请参阅下面的一个示例)。此外,如果只有一个json文件到达PubSub(应该转换为只有一行的拼花文件),我不明白为什么会生成许多拼花文件(如果我让作业运行几分钟,则会超过10个)。


    import argparse
    import logging
    import pyarrow
    
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    
    def run(input_topic, output_path, pipeline_args=None):
        # TODO - Dynamic parquet_schema definition
        # input_topic = known_args.input
        # parquet_schema = get_parquet_schema(input_topic)
    
        parquet_schema = pyarrow.schema(
            [('Attr1', pyarrow.string()), ('Attr2', pyarrow.string()),
             ('Attr3', pyarrow.string()), ('Attr4', pyarrow.string()),
             ('Attr5', pyarrow.string()), ('Attr6', pyarrow.string())
             ]
        )
    
        # instantiate a pipeline with all the pipeline option
        pipeline_options = PipelineOptions(pipeline_args, streaming=True)
    
        # processing and structure of pipeline
        with beam.Pipeline(options=pipeline_options) as pipeline:
            (
                pipeline
                | 'Input: Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic)
                | 'Output: Export to Parquet' >> beam.io.parquetio.WriteToParquet(
                    file_path_prefix=output_path,
                    schema=parquet_schema,
                    file_name_suffix='.parquet')
            )
    
    
    if __name__ == '__main__':
        logging.getLogger().setLevel(logging.INFO)
    
        parser = argparse.ArgumentParser()
        parser.add_argument('--input_topic',
                            help='input pubsub topic to read data.',)
        parser.add_argument('--output_path',
                            help='gcs output location for parquet files.',)
        known_args, pipeline_args = parser.parse_known_args()
    
        run(
            known_args.input_topic,
            known_args.output_path,
            pipeline_args,
        )

这是数据流生成的错误:


    Error message from worker: 
    java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1018: Traceback (most recent call last): 
    File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process 
    File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process 
    File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n]) 
    TypeError: byte indices must be integers or slices, not str 
    
    During handling of the above exception, another exception occurred: 
    
    Traceback (most recent call last): 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute response = task() 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction getattr(request, request_type), request.instruction_id) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle bundle_processor.process_bundle(instruction_id)) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 972, in process_bundle element.data) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value) 
    File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output 
    File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output 
    File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
    File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process 
    File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process 
    File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process 
    File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented 
    File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) 
    File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process 
    File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process 
    File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n]) 
    TypeError: byte indices must be integers or slices, not str [while running 'generatedPtransform-1004'] 
    
    java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
    java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
    org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) 
    org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333) 
    org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) 
    org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) 
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369) 
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154) 
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088) 
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) 
    Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1018: Traceback (most recent call last): 
    File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process 
    File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process 
    File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/
    filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n]) 
    TypeError: byte indices must be integers or slices, not str 
    
    During handling of the above exception, another exception occurred: 
    
    Traceback (most recent call last): 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute response = task() 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction getattr(request, request_type), request.instruction_id) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle bundle_processor.process_bundle(instruction_id)) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 972, in process_bundle element.data) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value) 
    File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output 
    File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output 
    File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
    File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process 
    File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process 
    File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process 
    File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented 
    File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) 
    File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process 
    File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process 
    File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/
    filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n]) 
    TypeError: byte indices must be integers or slices, not str [while running 'generatedPtransform-1004'] 
    
    org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) 
    org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) 
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)

共有1个答案

冷英博
2023-03-14

很抱歉,这给了您一条如此丑陋的错误消息!当更多的转换具有类型支持时,这看起来正是我们能够及早发现的错误(请参见https://beam.apache.org/blog/python-typing/更多信息)。

ParquetIO接收器期望输入包含字典元素的PCollection,但PubSub源输出包含<code>字节</code>元素的PCcollection。您需要添加一个转换来解析有效负载<code>字节

(pipeline
  | 'Input: Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic)
  | '*** Parse JSON -> dict ***' >> beam.Map(json.loads)
  | 'Output: Export to Parquet' >> beam.io.parquetio.WriteToParquet(
                    file_path_prefix=output_path,
                    schema=parquet_schema,
                    file_name_suffix='.parquet')
 类似资料:
  • 上面写着“Google Cloud Messaging(GCM)是一个免费服务”,但是为了使它能够运行,我需要在Google Cloud平台中创建一个项目,这需要花钱…那怎么免费呢?还是我错过了什么?

  • 更新:已弃用GCM,请使用FCM 如何将新的Google云消息集成到PHP后端?

  • 这样的消息在开发环境中很好,但是在生产环境中,我们不能在浏览器中向用户显示这一点。所以我想知道谷歌云存储中是否有定制这些消息和页面的规定。 提前感谢, 约杰什

  • 当我运行Dataflow作业时,它会将我的小程序包(setup.py或requirements.txt)上传到Dataflow实例上运行。 但是数据流实例上实际运行的是什么?我最近收到了一个stacktrace: 但从理论上讲,如果我在做,这意味着我可能没有运行这个Python补丁?你能指出这些作业正在运行的docker图像吗,这样我就可以知道我使用的是哪一版本的Python,并确保我没有在这里找

  • 我正在尝试使用谷歌云数据流将谷歌PubSub消息写入谷歌云存储。PubSub消息采用json格式,我要执行的唯一操作是从json到parquet文件的转换。

  • 任何关于我如何做到这一点的想法或任何可以帮助我的例子。谢谢你。