当前位置: 首页 > 知识库问答 >
问题:

如何使用触发器和驱逐器测试Flink全局窗口

葛鸿熙
2023-03-14

我有一个使用Flink Global Window的管道,它具有基于事件时间(从到达元素的时间戳)的自定义触发器和从窗口中删除不必要元素并将其传递给ProcessFunction的Evictor,

像这样的东西:

 public SingleOutputStreamOperator<Results> processElements(DataStream<Elements> inputStream) {
 return inputStream
                .keyBy(Elements::getId)
                .window(GlobalWindows.create())
                .trigger(new CustomTrigger())
                .evictor(new CustomEvictor())
                .process(new MyWindowProcessFunction())
                .name("Process")
                .uid("process-elements")
                .returns(Results.class);    
}

    public void executePipelineFlow(StreamExecutionEnvironment env) throws Exception {
        DataStream<Elements> inputStream = getInputStream(env);
        DataStream<Results> processedInput = processElements(inputStream);
        applySink(processedInput);
}

我知道我可以用提供水印操作的TestHarness测试我的WindowProcessFunction,但是我需要测试整个流程,触发退出器ProcessFunction。

我还尝试了使用Thread.sleep()的某种定时SourceFunction,但是我的管道在事件时间内工作,如果我在测试流中有1000个元素,这将不起作用(因为测试将需要几个小时)。

我的问题是,如何对整个< code>processElements方法进行单元测试?

我找不到我的案例的任何测试示例。

谢谢

共有1个答案

宿丰
2023-03-14

作为一个例子,您可以看看Flink培训中窗口练习的端到端集成测试是如何实现的。这个练习没有使用GlobalWindows或自定义触发等,但是您可以使用这个整体方法来测试任何管道。

这种方法可能不太理想的一点是它如何处理水印。被测试的应用程序使用默认的周期性水印策略,其中水印每200毫秒生成一次。由于测试不会运行那么长时间,实际生成的唯一水印是在每个输入受限的作业结束时出现的水印。这是可行的,但与生产中会发生的情况不太一样。(这就是你考虑让测试源在事件之间Hibernate的原因吗?)

顺便说一下,Flink培训报告中的这些测试比通常需要的稍微复杂一些,因为这些测试用于覆盖练习和解决方案的Java和Scala实现。

 类似资料:
  • 我有flink stream,我在某个时间窗口上计算一些事情,比如说30秒。 这里发生的事情是,它给我的结果,我以前的窗口聚合以及。 假设前30秒我得到结果10。 接下来的三十秒我想要新的结果,而不是我得到最后一个窗口的结果新的等等。 因此,我的问题是如何为每个窗口获得新的结果。

  • 我有一个dynamoDb表,它设置为两个区域之间的全局(2019版)。 我在表上分配了一个lambda函数作为触发器。当一条记录插入到表的东版本中时,就会触发lambda的东版本。然后将记录复制到表的西版本,并触发lambda的西版本。 我想要一个λ触发。但是我也希望两个触发器都启用,以防一个区域出现故障。 我怎样才能做到这一点?我宁愿不使触发器逻辑幂等。

  • 问题内容: 假设我有一个类似 测试此bean实际上将在其指定日期( 即 最接近每月15日的工作日)触发的最佳方法是什么? 更新 :这应该是一个单元测试,所以我不会启动虚拟机或更改系统时间。 问题答案: 首先,没有必要进行自我测试。它是spring框架的一部分,已经过测试。 更好的测试可能是测试您的cron表达式是否符合您的期望。这里的一种选择是使用Quartz的类。给定一个对象,您可以调用,该表达

  • 下面是我的顶点触发器。我是一名初学者,正在尝试编写其测试类,但不断出现错误“System.DmlException:Insert失败。第0行出现第一个异常;第一个错误:REQUIRED\u FIELD\u MISSING,错误:只有在产品相关列表上为此opportunity选择了价格手册,才能选择产品。:[]”。 在Opportunity上触发TrgrOptyHighestCustmorePric

  • 有人能给我解释一下如何为下面这样的apex触发器编写测试类吗? 我是Salesforce的新手。有人帮助我如何为上述触发器编写顶点类(测试类)吗? AccountBrowseExtensionTesttestAccountBrowseSystem。DmlException:插入失败。第0行第一个异常;第一个错误:FIELD\u CUSTOM\u VALIDATION\u EXCEPTION,Cit

  • 我在Apex中有一个触发器。如何编写检查触发器是否被调用的单元测试?