当前位置: 首页 > 知识库问答 >
问题:

由于窗口对象或内部处理而丢弃大量事件的数据流

关昊天
2023-03-14

最近开发了一个数据流消费者,它从PubSub订阅中读取数据,并将分组在同一窗口中的所有对象的组合输出到拼花文件。

当我在没有巨大负载的情况下进行测试时,一切似乎都很好。

然而,在执行了一些繁重的测试后,我可以看到,从发送到PubSub队列的1.000.000个事件中,只有1000个事件成功到达Parket!

根据不同阶段的多个墙时间,在应用窗口之前解析事件的墙时间似乎持续58分钟。写入拼花文件的最后阶段持续1小时32分钟。

我现在将展示其中最相关的代码部分,希望您能够了解这是由于窗口对象定义之前的逻辑造成的,还是由于窗口对象本身造成的。

pipeline
        .apply("Reading PubSub Events",
            PubsubIO.readMessagesWithAttributes()
                .fromSubscription(options.getSubscription()))
        .apply("Map to AvroSchemaRecord (GenericRecord)",
            ParDo.of(new PubsubMessageToGenericRecord()))
        .setCoder(AvroCoder.of(AVRO_SCHEMA))
        .apply("15m window",
            Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(15)))
                .triggering(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(1)))
                .withAllowedLateness(Duration.ZERO)
                .accumulatingFiredPanes()
        )

还要注意,我正在运行Beam 2.9.0。

第二阶段中的逻辑是否会过于繁重,以至于消息到达得太晚而在窗口中被丢弃?逻辑基本上包括读取有效负载、解析为POJO(读取内部映射属性、过滤等)

但是,如果我向PubSub发送了一百万个事件,那么所有这些一百万个事件都会一直持续到拼花写入文件阶段,但是这些拼花文件并不包含所有这些事件,只是部分包含。这有意义吗?

我需要触发器独立于延迟消耗所有这些事件。

共有1个答案

潘安平
2023-03-14

引用Apache Beam邮件列表上的答案:

这是一个不幸的可用性问题,触发器可能会意外关闭窗口并删除所有数据。相反,我认为你可能需要这个触发器:

Repeatedly.forever(
    AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardSeconds(1)))

我建议使用以下方式来表示此触发器:

AfterWatermark.pastEndOfWindow().withEarlyFirings(
    AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardSeconds(1)))

在第二种情况下,不可能意外“关闭”窗口并删除所有数据。

 类似资料:
  • 我正在研究javaFX上事件处理程序的机制,但我不确定我是否理解了它,事实上我有一点怀疑:如果我有两个对象,它们有处理事件的所有必要代码(EventHandler接口ECC..),它们属于同一stackPane,问题是:有没有一种方法让第一个对象启动一个事件(例如ActionEvent),尽管它们属于同一Pane但将由两个对象处理?因为对于我所理解的“事件路线”来说,这是不可能的,至少是直接的。实

  • 我们有一个在3.5版本上利用Hazelcast IExecutor服务和IMap的系统。我们最近遇到了Hazelcast集群成员在生产中内存不足的情况,一个接一个,最后所有节点都被OOM崩溃。 在进行原因分析时,我们发现下面有数千个日志条目,日志文件大小呈指数级增长。存放原木的存储空间也已经用完。 我知道,集群成员会不断发出心跳,以确保所有成员都活着,我相信默认值是10sec。现在的问题是,如果任

  • 假设我有一个股票市场交易事件流,如下所示: 使得technicalN(其中N是一些数字)代表给定公司的日终股票市场交易数据的第N个技术交易条目[开盘(浮动)、高位(浮动)、低位(浮动)、收盘(浮动)、成交量(int)]。(即ticker GOOG的技术1不同于ticker MSFT的技术1。)如: (请注意,这些交易价格/交易量完全是虚构的。 假设我想创建一个大小为2、时间间隔为1天的窗口,这样我

  • 问题内容: 我正在编写将Firebase中的元素附加到数组以使用文本字段执行简单搜索的代码。 该方法的代码如下: 我个人没有想到有可能发生很多事件的机会。我将无法将所有1000多个事件附加到字典中。那会刺痛我的记忆。无论如何,我可以让查询响应文本字段。也有人可以帮助我完成执行该操作但不会破坏我的记忆的查询行吗? 例如,我想提取有关使用查询搜索的事件的所有信息。因此,如果我开始输入“美国制造”,它将

  • 具有事件源的CQR看起来非常适合作为我们的一个系统的架构,目前我们只担心一件小事:处理大量事件,并因此处理大型事件商店。 我们当前的系统每天接收大约一百万个事件(目前与事件源无关),如果我们将它们存储在更长的时间内,我们的事件存储可能会变得相当大,但是如果我们经常转储/清除滚动快照,我们可能会失去事件源的一大优势:关于系统历史和重播的信息。 在CQRS架构中处理这个问题的常见方法是什么?这到底是个

  • 问题内容: 我需要使用Java或基于Javascript的自动化解决方案来操纵IE浏览器的“弹出窗口和下载对话框”。 我尝试了selenium2,但是它不能正常工作,因此其他建议也一样。实际上selenium2没有提供警报/下载对话框的正确处理,因此我正在考虑使用其他一些javascript / java解决方案。 使用“下载对话框”:我需要将下载的文件保存到特定位置。使用“警报对话框”:我需要检