当前位置: 首页 > 知识库问答 >
问题:

数据流大侧输入中的Apache波束

宁卓
2023-03-14

这与这个问题最为相似。

我正在Dataflow 2.x中创建一个管道,它从Pubsub队列获取流式输入。进入的每一条消息都需要通过来自Google BigQuery的一个非常大的数据集进行流式传输,并且在写入数据库之前附加了所有相关的值(基于一个键)。

问题是来自BigQuery的映射数据集非常大--任何将其用作侧输入的尝试都失败了,数据流运行程序会抛出错误“java.lang.IllegalArgumentException:ByteString would be too long”。我尝试了以下策略:

1)侧输入

  • 如前所述,映射数据(显然)太大,无法执行此操作。如果我在这里错了,或者有解决方法,请告诉我,因为这将是最简单的解决方案。

2)键值对映射

  • 在此策略中,我在管道的第一部分读取BigQuery数据和Pubsub消息数据,然后运行每个贯穿ParDo转换,将PCollections中的每个值更改为KeyValue对。然后,我运行一个Merge.Flatten转换和一个GroupByKey转换,将相关的映射数据附加到每个消息。
  • 这里的问题是,流数据需要窗口化才能与其他数据合并,因此我必须将窗口化也应用于大的、有界的BigQuery数据。它还要求两个数据集上的窗口化策略相同。但是没有一个有界数据的窗口化策略是有意义的,我所做的几次窗口化尝试只是在一个窗口中发送所有BQ数据,然后再也不发送了。它需要与每个传入的pubsub消息连接。

3)直接在ParDo(DoFn)中调用BQ

  • 这似乎是个好主意-让每个工作者声明地图数据的静态实例。如果不存在,那么直接调用BigQuery来获取它。不幸的是,这每次都会从BigQuery中抛出内部错误(在整个消息中只说“内部错误”)。向谷歌提交了一张支持票,结果他们告诉我,基本上,“你不能那样做”。

似乎这个任务并不符合“尴尬的并行”模型,所以我是不是找错了地方?

编辑:

即使在dataflow中使用高内存机器并尝试将侧边输入到map视图中,我也会得到错误java.lang.IllegalArgumentException:ByteString would be too long

下面是我正在使用的代码的一个示例(psuedo):

    Pipeline pipeline = Pipeline.create(options);

    PCollectionView<Map<String, TableRow>> mapData = pipeline
            .apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql())
            .apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn())) 
            .apply(View.asMap());

    PCollection<PubsubMessage> messages = pipeline.apply(PubsubIO.readMessages()
            .fromSubscription(String.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription)));

    messages.apply(ParDo.of(new DoFn<PubsubMessage, TableRow>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            JSONObject data = new JSONObject(new String(c.element().getPayload()));
            String key = getKeyFromData(data);
            TableRow sideInputData = c.sideInput(mapData).get(key);
            if (sideInputData != null) {
                LOG.info("holyWowItWOrked");
                c.output(new TableRow());
            } else {
                LOG.info("noSideInputDataHere");
            }
        }
    }).withSideInputs(mapData));

管道抛出异常并在从pardo中记录任何内容之前失败。

堆栈跟踪:

java.lang.IllegalArgumentException: ByteString would be too long: 644959474+1551393497
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.concat(ByteString.java:524)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:576)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.copyFrom(ByteString.java:559)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString$Output.toByteString(ByteString.java:1006)
        com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
        com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
        com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:951)
        com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1000)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:133)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:771)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

共有1个答案

相德宇
2023-03-14

查看本文https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-2中的“pattern:Streaming mode large lookup tables”一节(这可能是唯一可行的解决方案,因为您的侧输入不适合内存):

描述:

一个大的(以GBs为单位)查找表必须是精确的,并且经常更改或者不适合内存。

示例:

您有来自零售商的销售点信息,并且需要将产品项的名称与包含ProductID的数据记录相关联。在一个可以不断变化的外部数据库中存储着数十万个项目。此外,必须使用正确的值处理所有元素。

解决方案:

使用“调用外部服务进行数据充实”模式,但不是调用微服务,而是直接调用读优化的NoSQL数据库(如Cloud Datastore或Cloud Bigtable)。

对于要查找的每个值,使用KV实用程序类创建一个键值对。执行GroupByKey以创建相同键类型的批以对数据库进行调用。在DoFn中,向数据库调用该键,然后通过遍历迭代程序将该值应用于所有值。按照“调用外部服务进行数据充实”中所述的客户端实例化的最佳实践。

本文介绍了其他相关模式:https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1:

  • 模式:缓慢更改的查找缓存
  • 模式:调用外部服务进行数据充实
 类似资料:
  • 我没有找到任何文档允许将错误处理应用于此步骤,也没有找到将其重写为DOFN的方法。对此应用错误处理有什么建议吗?谢谢

  • 我正在处理一个更大的数据流管道,它在批处理模式下工作得很好,但完成后的重构确实有侧输入的问题。如果我将管道置于流模式并移除侧输入,管道在Google的数据流上可以完美地工作。 如果把所有东西都剥离下来,构建以下简短的脚本来封装这个问题,并能够与它一起玩。 在Google的Dataflow中以批处理作业的形式运行这个脚本可以完成它需要做的事情。请参阅从数据流中可视化的管道:

  • 我们在实验中发现,在DataFlow/Apache Beam管道中设置显式的输出碎片#会导致更差的性能。我们的证据表明,Dataflow在最后秘密地做了另一个GroupBy。我们已经转向让Dataflow自动选择碎片数(shards=0)。但是,对于某些管道,这会导致大量相对较小的输出文件(~15K文件,每个<1MB)。

  • 嗨,我已经创建了一个apache beam管道,测试了它,并在eclipse内部运行了它,包括本地和使用dataflow Runner。我可以在eclipse控制台中看到管道正在运行。e.控制台上的日志。 这是我在cmd提示符中使用的maven命令, 这是我用来创建管道和设置选项的代码段。

  • 我对GCP、Dataflow、Apache Beam、Python和一般的OOP都是新手。我来自函数式javascript领域,对于上下文。 现在,我已经用Apache Beam python sdk构建了一个流管道,并将其部署到GCP的数据流中。管道的源是pubsub订阅,接收器是数据存储。 管道从pubsub订阅中获取消息,根据配置对象+消息内容做出决定,然后根据做出的决定将其放在数据存储中的

  • 我想得到输入流作为JSON数组从一个网址。如何设置源代码,以便在apache flink中使用datastream连续获得输入。简而言之,我想从一个url连续获得json数据,而不会关闭flink作业。