当前位置: 首页 > 知识库问答 >
问题:

使用元素计数使用数据流写入GCS

汪永春
2023-03-14

这参考了阿帕奇光束 SDK 版本 2.2.0。

我正在尝试使用<code>AfterPane.elementCountAtLeast(…)

使用 GCP 数据流 2.0 PubSub 到 GCS 作为参考,以下是我尝试过的方法:

String bucketPath =
    String.format("gs://%s/%s", 
        options.getBucketName(), 
        options.getDestinationDirName());

PCollection<String> windowedValues = stringMessages
    .apply("Create windows",
        Window.<String>into(new GlobalWindows())
        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(250)))
        .discardingFiredPanes());

windowedValues
    .apply("Write to GCS",
        TextIO
            .write()
            .to(bucketPath)
            .withNumShards(options.getNumShards())
            .withWindowedWrites());

其中< code>stringMessages是从Avro编码的发布订阅中读取的PCollection。上游发生了一些解包,将事件转换成字符串,但没有合并/划分/分组,只有转换。

元素计数仅为PoC硬编码为250。一旦被证明,它很可能会被调到数以千计的10秒或100秒。

问题

这种实现产生了不同长度的文本文件。当作业第一次启动时,文件长度开始非常高(1000个元素)(大概是处理积压的数据,然后在某个时候稳定下来。我尝试将“numShards”更改为1和10。在1时,写入文件的元素数稳定在600,在10时,它稳定在300。

我错过了什么?

作为旁注,这只是步骤 1。一旦我弄清楚了使用元素计数进行写入,我仍然需要弄清楚将这些文件作为压缩的json(.json.gz)而不是纯文本文件。

共有1个答案

罗宪
2023-03-14

发布我学到的东西供他人参考。

当我写这篇文章时,我不清楚的是Apache Beam文档中的以下内容:

聚合多个元素的转换,如<code>GroupByKey

有了这些知识,我重新思考了一下我的管道。从写入文件下的FileIO留档-

请注意,设置固定数量的碎片可能会影响性能:它会向管道中添加额外的<code>GroupByKey</code>。但是,由于BEAM-1438和其他流道中的类似行为,在写入无界PCollection时需要设置它。

所以我决定使用< code>FileIO的< code>writeDynamic来执行写操作,并指定< code>withNumShards来获得隐式的< code>GroupByKey。最终结果如下所示:

PCollection<String> windowedValues = validMessageStream.apply(Window
            .<String>configure()
            .triggering(Repeatedly.forever(AfterFirst.of(
                    AfterPane.elementCountAtLeast(2000),
                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
                            Duration.standardSeconds(windowDurationSeconds)))))
            .discardingFiredPanes());

windowedValues.apply(FileIO.<String, String>writeDynamic()
            .by(Event::getKey)
            .via(TextIO.sink())
            .to("gs://data_pipeline_events_test/events/")
            .withDestinationCoder(StringUtf8Coder.of())
            .withNumShards(1)
            .withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
 类似资料:
  • SQLAlchemy 1.4 / 2.0 Tutorial 此页是 SQLAlchemy 1.4/2.0教程 . 上一页: 处理事务和DBAPI |下一步: |next| 使用数据库元数据 随着引擎和SQL执行的停止,我们准备开始一些炼金术。SQLAlchemy Core和ORM的核心元素是SQL表达式语言,它允许流畅、可组合地构造SQL查询。这些查询的基础是表示数据库概念(如表和列)的Pytho

  • 我有一个名为df的数据库数据帧。我想将它作为csv文件写入S3存储桶。我有S3存储桶名称和其他凭据。我检查了这里给出的在线留档https://docs.databricks.com/spark/latest/data-sources/aws/amazon-s3.html#mount-aws-s3它说使用以下命令 但我有的是数据帧,而不是文件。怎么才能实现?

  • 下面的代码尝试使用缓冲区将WAV文件的头写入流,然后将其写入可写流。 出于某种原因,文件的前8个字节是错误的: 第一行应该是: 这条线路有故障: 它需要:

  • 问题内容: 这段代码曾经很好(在某种意义上,编译器没有抱怨): 从Swift 5.0开始,这会产生警告: 警告:不建议使用“ withUnsafeBytes”:请改用 我试着用所提出的方法,但我似乎无法缠斗到这最终需要。 如何以不推荐的方式编写此函数? 问题答案: 诀窍是使用函数: 尽管此功能适用于Swift 5.0,但显然存在一些问题。参见相关论坛讨论。

  • 显然它无法解码数据。有什么想法吗?