这参考了阿帕奇光束 SDK 版本 2.2.0。
我正在尝试使用<code>AfterPane.elementCountAtLeast(…)
使用 GCP 数据流 2.0 PubSub 到 GCS 作为参考,以下是我尝试过的方法:
String bucketPath =
String.format("gs://%s/%s",
options.getBucketName(),
options.getDestinationDirName());
PCollection<String> windowedValues = stringMessages
.apply("Create windows",
Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(250)))
.discardingFiredPanes());
windowedValues
.apply("Write to GCS",
TextIO
.write()
.to(bucketPath)
.withNumShards(options.getNumShards())
.withWindowedWrites());
其中< code>stringMessages是从Avro编码的发布订阅中读取的PCollection。上游发生了一些解包,将事件转换成字符串,但没有合并/划分/分组,只有转换。
元素计数仅为PoC硬编码为250。一旦被证明,它很可能会被调到数以千计的10秒或100秒。
问题
这种实现产生了不同长度的文本文件。当作业第一次启动时,文件长度开始非常高(1000个元素)(大概是处理积压的数据,然后在某个时候稳定下来。我尝试将“numShards”更改为1和10。在1时,写入文件的元素数稳定在600,在10时,它稳定在300。
我错过了什么?
作为旁注,这只是步骤 1。一旦我弄清楚了使用元素计数进行写入,我仍然需要弄清楚将这些文件作为压缩的json(.json.gz)而不是纯文本文件。
发布我学到的东西供他人参考。
当我写这篇文章时,我不清楚的是Apache Beam文档中的以下内容:
聚合多个元素的转换,如<code>GroupByKey
有了这些知识,我重新思考了一下我的管道。从写入文件下的FileIO留档-
请注意,设置固定数量的碎片可能会影响性能:它会向管道中添加额外的<code>GroupByKey</code>。但是,由于BEAM-1438和其他流道中的类似行为,在写入无界PCollection
时需要设置它。
所以我决定使用< code>FileIO的< code>writeDynamic来执行写操作,并指定< code>withNumShards来获得隐式的< code>GroupByKey。最终结果如下所示:
PCollection<String> windowedValues = validMessageStream.apply(Window
.<String>configure()
.triggering(Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(2000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
Duration.standardSeconds(windowDurationSeconds)))))
.discardingFiredPanes());
windowedValues.apply(FileIO.<String, String>writeDynamic()
.by(Event::getKey)
.via(TextIO.sink())
.to("gs://data_pipeline_events_test/events/")
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1)
.withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
SQLAlchemy 1.4 / 2.0 Tutorial 此页是 SQLAlchemy 1.4/2.0教程 . 上一页: 处理事务和DBAPI |下一步: |next| 使用数据库元数据 随着引擎和SQL执行的停止,我们准备开始一些炼金术。SQLAlchemy Core和ORM的核心元素是SQL表达式语言,它允许流畅、可组合地构造SQL查询。这些查询的基础是表示数据库概念(如表和列)的Pytho
我有一个名为df的数据库数据帧。我想将它作为csv文件写入S3存储桶。我有S3存储桶名称和其他凭据。我检查了这里给出的在线留档https://docs.databricks.com/spark/latest/data-sources/aws/amazon-s3.html#mount-aws-s3它说使用以下命令 但我有的是数据帧,而不是文件。怎么才能实现?
下面的代码尝试使用缓冲区将WAV文件的头写入流,然后将其写入可写流。 出于某种原因,文件的前8个字节是错误的: 第一行应该是: 这条线路有故障: 它需要:
问题内容: 这段代码曾经很好(在某种意义上,编译器没有抱怨): 从Swift 5.0开始,这会产生警告: 警告:不建议使用“ withUnsafeBytes”:请改用 我试着用所提出的方法,但我似乎无法缠斗到这最终需要。 如何以不推荐的方式编写此函数? 问题答案: 诀窍是使用函数: 尽管此功能适用于Swift 5.0,但显然存在一些问题。参见相关论坛讨论。
显然它无法解码数据。有什么想法吗?