当前位置: 首页 > 知识库问答 >
问题:

未触发具有事件时间特性的TumblingProcessingTimeWindows处理

梁丘俊人
2023-03-14

更具体地说,需要计算的加密数据流的计数为7秒。

  1. 1秒滚动窗口
  2. 可提前1秒计算7秒的滑动窗口
  3. 每隔1秒输出全部计数的windowall

我无法对其进行集成测试(即,类似于单元测试,但端到端测试),因为输入有假事件时间,这不会触发

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val oneDayCounts = data
  .map(t => (t.key1, t.key2, 1L, t.timestampMs))
  .keyBy(0, 1)
  .timeWindow(Time.seconds(1))
  .sum(2)

val sevenDayCounts = oneDayCounts
  .keyBy(0,1)
  .timeWindow(Time.seconds(3), Time.seconds(1))
  .sum(2)

// single reducer
sevenDayCounts
  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .process(...)

EventTimeTrigger可以为求和计算激发,但以下TumblingProcessingTimeWindow无法触发。我在IT测试代码中有一个30s的线程.睡眠,但在30s之后仍然没有触发

共有1个答案

戎洛华
2023-03-14

一般来说,编写有意义的测试来处理时间窗口是一个挑战,因为它们本质上是不确定的。这是为什么通常首选事件时间窗口的一个原因。

在正确的地方睡觉也很难达到预期的效果。但是,保持作业运行足够长的时间以使处理时间窗口触发的一种方法是使用包含Hibernate的自定义源。具有有限源的Flink流作业在输入耗尽后自行关闭。通过管道发送值为MAX_WATERMARK的最后一个水印,这将触发所有事件时间窗口,但处理时间窗口只有在指定时间到达时仍在运行时才会触发。

请参阅此答案以获得一个解决此问题的黑客示例。

 类似资料:
  • 问题内容: 我有两个Redis客户端,在一个文件中,我有一个简单的脚本设置并删除了Redis密钥: 在第二个文件中,我有一个Redis客户端充当订户: 关键的“占位符”已设置,那么是否有充分的理由使我在“消息”处理程序中未获得任何输出? 问题答案: 您忘记了订阅用户客户端订阅特定的频道。此外,如果要监视所有事件,则需要使用基于模式的订阅。 您可能想要执行以下操作(未测试): 请参阅Redis文档和

  • 问题内容: epoll的手册页上有一个触发边的示例代码,如下所示: 在do_use_fd函数中,我在while循环中调用非阻塞recv直到EAGAIN为止,示例代码运行良好。 我对此示例代码有一个疑问,假设现在我有50个套接字客户端连接,突然有10个客户端同时写入数据,因此epoll_wait()将返回10,然后转到for循环: 它将调用 这10个客户端,假设n = 5完成,并且n = 6尚未完成

  • 问题内容: 我有3个文件: js_json.js->用于我的json代码 javascript.js->用于我的javascript函数 index.php 这里的代码为: 这是我的代码: 这里的代码: 我的问题是: 当我单击链接“ Hola Test 1”时,它将起作用并显示消息。问题是,在单击选择选项之后,出现了链接“ Hola Test”,然后单击该链接(“ Hola Test”),该消息没

  • 问题内容: 我有一个带有一列复选框的GridView(GridView的其余部分正在从数据库中填充)。我正在使用AJAX执行不同的功能,并且想知道我是否只是在正确的位置调用了OnCheckedChanged事件。是否应该将其包装在某种UpdatePanel中?我对这一切的工作方式仍然很陌生…基本上,我的目标是在选中复选框后更改数据库中的位值。我知道该怎么做的逻辑,我只是​​不知道我是否以正确的方式

  • 我正在为android创建一个phonegap应用程序,并想使用一些phonegap事件,如“恢复”、“暂停”、“后退按钮”等,但除了“deviceready”事件外,这些事件都不会被触发。以下是我的javascript代码,请检查我是否犯了任何错误: “ondeviceredy()”函数中的警报正在工作。 请帮忙,提前谢谢。

  • 问题内容: 我的整个项目都使用(Bluebird)Promises,但是有一个使用EventEmitter的特定库。 我想要实现以下目标: 我在Promises链中读了EventEmitter的答案。这给了我一种执行’connect’事件的回调的方法。这是我到目前为止所到之处 现在如何进一步链接“ eventB”? 问题答案: 我假设您想为每个事件做不同的事情。即使由的动作触发,您也可以将其视为另