当前位置: 首页 > 知识库问答 >
问题:

Apache Beam GenerateSequence不以指定速率发射元素

房光临
2023-03-14

我正在使用Apache Beam进行实验,并尝试使用GenerateSequence PTransform作为生成无界流数据源的简单方法。

GenerateSequence类提供了WithRate(long numElements,Duration periodLength)方法,据我所知,该方法控制每个周期产生的元素的速率以及周期的持续时间。令我惊讶的是,这些元素产生的速度与所提供的描述不一致。

例如,我尝试使用以下代码片段:

Pipeline p = Pipeline.create(pipelineOptions);
Duration runtimeDuration = Duration.standardSeconds(20L);
Duration periodDuration = Duration.standardSeconds(1L);
PCollection<String> generated_seq = p.apply("Get Sequence",
        GenerateSequence.from(1)
                .withMaxReadTime(runtimeDuration)
                .withRate(1, periodDuration))
        .apply("Test sequence generation", ParDo.of(new DoFn<Long,String>() {
            @ProcessElement
            public void processElement(@Element Long in, OutputReceiver<String> out){
                long userId = in % 5; //simulate events from 5 users
                Instant timestamp = Instant.now();
                DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy-MM-dd HH-mm-ss.SSSzZ").withZone(DateTimeZone.forID("Etc/GMT"));
                System.out.println(in + " => UserId:" + userId + "|Timestamp: " + timestamp.toString(fmt));
                out.outputWithTimestamp(Long.toString(userId), timestamp);
            }
        }));
5 => UserId:0|Timestamp: 2021-03-08 21-20-25.361GMT+0000
14 => UserId:4|Timestamp: 2021-03-08 21-20-25.446GMT+0000
6 => UserId:1|Timestamp: 2021-03-08 21-20-25.450GMT+0000
12 => UserId:2|Timestamp: 2021-03-08 21-20-25.452GMT+0000
7 => UserId:2|Timestamp: 2021-03-08 21-20-25.456GMT+0000
9 => UserId:4|Timestamp: 2021-03-08 21-20-25.459GMT+0000
13 => UserId:3|Timestamp: 2021-03-08 21-20-25.461GMT+0000
1 => UserId:1|Timestamp: 2021-03-08 21-20-25.463GMT+0000
2 => UserId:2|Timestamp: 2021-03-08 21-20-25.465GMT+0000
16 => UserId:1|Timestamp: 2021-03-08 21-20-25.468GMT+0000
10 => UserId:0|Timestamp: 2021-03-08 21-20-25.469GMT+0000
8 => UserId:3|Timestamp: 2021-03-08 21-20-25.471GMT+0000
15 => UserId:0|Timestamp: 2021-03-08 21-20-25.474GMT+0000
4 => UserId:4|Timestamp: 2021-03-08 21-20-25.476GMT+0000
17 => UserId:2|Timestamp: 2021-03-08 21-20-25.478GMT+0000
11 => UserId:1|Timestamp: 2021-03-08 21-20-25.488GMT+0000
3 => UserId:3|Timestamp: 2021-03-08 21-20-37.613GMT+0000

如上所述,尽管指定了withrate(1,periodDuration),但大多数元素都是在同一秒内生成的;即指定在1秒的周期内最多生成1个元素

我试图深入研究SDK代码,以理解并希望解决此行为的原因,但我无法确定其原因。因此,是否有一种方法可以解决这个问题,或者是否有类似的PTransforms可以模拟一个无界的流媒体源?

共有1个答案

翟高明
2023-03-14

这可能是由generateSequence转换中的一个怪癖引起的,文档中并没有真正解释这个怪癖。具体地说,用于生成数字(countingsource)的基础源的工作方式是,如果它用完了要发出的元素,那么在再次检查源之前会有一个短暂的等待。如果此等待时间大于周期持续时间,那么下次检查源时可能会有多个元素排队,源将快速通过它们。

所以在你的例子中,可能发生的是源启动,并且还没有发出任何元素,因为一秒钟的周期还没有过去。几秒钟后再次检查它,此时它迅速发出该时间段内应该发出的所有元素,直到它耗尽,然后再次等待。这可以从示例中的最后一个元素中看到,该元素是在前一个元素之后12秒发出的。扩展运行时持续时间是看到这一点的好方法;您可能会看到多批元素被发出。

如果您所需要的只是周期性地生成原始数字,那么上面描述的行为就可以很好地工作。但是,如果使用generateSequence测试依赖于时间戳的管道,则需要设置自定义的TimestampFn来为每个发出的元素设置时间戳。源代码中的默认TimestampFn可能是一个很好的示例。对您来说,一个非常简单的方法可能是设置时间戳以匹配元素的值。

 类似资料:
  • 正如标题所说,我正在尝试使用排定注释的fixedRate参数,以便每秒调用一个函数。下面是我正在使用的代码: 按照我的理解,在打印第一个“结束线程”之前,函数应该打印五次“开始线程”。 问题是函数先打印“开始线程”然后等待5.5秒,打印“结束线程”,然后走“开始线程”,以此类推……看起来调度程序在启动新的执行之前等待前一个执行完成,但fixedRate属性不应该是这种情况。 我仔细阅读了一下,发现

  • Spring Cloud Stream提供了一个名为spring-cloud-stream-metrics的模块,可以用来从Spring Boot度量端点到命名通道发出任何可用度量。该模块允许运营商从流应用收集指标,而不依赖轮询其端点。 当您设置度量绑定的目标名称(例如spring.cloud.stream.bindings.applicationMetrics.destination=<DEST

  • 我正在与反应流和出版商(Mono和Flux)合作,并使用zip和zipWith Flux方法将这两个出版商结合起来,如下所示: 以下是输出: 由于flux1有四个元素,flux2有三个元素,flux1中的第四个元素丢失。当我试图打印通量的日志时,没有关于第四元素发生了什么的信息。 以下是打印日志的语句: 下面是使用log方法的控制台日志: 从zip方法的文档中,我得到了 操作员将继续这样做,直到任

  • 问题内容: 我有一个页面,其源代码不可用,但是有一个输入框,其中光标闪烁。 我可以在没有找到元素的情况下将一些内容写到文本框中吗?我的意思是,某种方式下,发送键可以自动查找聚焦的输入框并输入输入内容。 我的代码无法正常工作 问题答案: 解决了

  • 我正在开发一个应用程序,该应用程序应该每隔X秒从网络下载一个文件,以检查是否有任何更改。我使用一个服务来完成这项工作,但它的执行并不随延迟时间率而固定,这是我的服务代码: 输出不是固定的,它应该每10秒运行一次,同时以随机方式运行服务

  • 我使用三台机器和本指南在aws集群上设置了一个测试环境。 我在本地模式下测试了我的代码,并使用wirbelsturm创建了一个本地vagrant集群,这两种方法都能产生预期的结果。 当我现在向Web服务器提交代码时,我的喷口和所有的螺栓都是静默的。我的嘴读的是csv,我把它抄送给了灵气和我的主管。storm UI显示我的拓扑为活动的,并显示所有螺栓和我的喷口,但计数器不可见。主管没有已使用得员工.